From 26a40e2e629e90f3e213960ff8e278fed0c0eed8 Mon Sep 17 00:00:00 2001 From: Paul Kompfner Date: Wed, 29 Apr 2026 15:23:18 -0400 Subject: [PATCH] fix: clear 10 more services from pyright ignore list MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A third pass over low-error-count files in the ignore list. Drops 10 files (67 → 57) and full-pyright errors from 555 → 525. Default pyright stays clean. Optional access guards (4 files). The same fix shape as 9e9b1f39e: a receiver typed `X | None` accessed without a guard, fixed with a local-var capture or an early return. - `mistral/stt.py`: `_connection.send_audio` could crash if `_connect()` swallowed an exception and left `_connection` unset; drop the audio chunk with a warning instead. `_receive_events` iterating `_connection.events()` got the same defensive narrowing. - `deepgram/flux/stt.py`: `_websocket_url` is set in `_connect` before `_connect_websocket` is called, but pyright doesn't track that across methods — assert at the use site. `websocket.response` is `Response | None` in the websockets stubs even though it's always populated post-handshake; guarded with a fallback. - `audio/filters/rnnoise_filter.py`: the module-level import sets `RNNoise` to `None` if `pyrnnoise` isn't installed; raise `ImportError` explicitly instead of relying on the existing try- block to catch the `None(...)` call. Also gated `filter()` with `or self._rnnoise is None` so pyright sees the narrowing. - `transports/smallwebrtc/request_handler.py`: `get_answer()` legitimately returns `None`; raise instead of crashing on three subscript accesses. `TTSService` `audio-context` API tightening. Mirroring the `append_to_audio_context` fix from the previous batch: `remove_audio_context` was typed `str` but is called with `str | None` from `get_active_audio_context_id()` results. Widened to `str | None` and the `None` handling lives in the function body (early debug log + return) — matching `append_to_audio_context`'s shape. `audio_context_available` keeps its narrow `str` signature; asking "is `None` available?" isn't a meaningful question (`_audio_contexts` is `dict[str, asyncio.Queue]`). The internal call site in `on_turn_context_completed` narrows `_turn_context_id` explicitly before passing it. Side effect: deepgram/tts.py's L307 error clears without local changes. `deepgram/tts.py` (4 errors → 0): the same `push_error(ErrorFrame(...))` latent bug we fixed in resembleai earlier in this PR — `push_error` takes a string; there's a separate `push_error_frame` for frames. Two sites switched. The Optional `_websocket.response` access is guarded the same way as deepgram/flux/stt.py. The `remove_audio_context` error was cleared by the tightening above. `aws/utils.py` (3 errors → 0): `AWSTranscribePresignedURL` declared `session_token: str` but the dict source is `str | None` (AWS supports long-term IAM creds without a session token). Same for `vocabulary_name`/`vocabulary_filter_name` on `get_request_url`, which were typed `str = ""` even though the body uses truthy checks to skip them. Widened to `str | None = None` — matches actual runtime semantics. `audio/dtmf/utils.py` (2 errors → 0): `files("...").joinpath(...)` returns a `Traversable`, but `aiofiles.open` wants a real path. For regular pip installs this worked in practice (Traversable was a `Path`), but it would fail for zipped distributions (zipapp, zipimport) where the resource isn't on disk. Wrapped in `importlib.resources.as_file(...)` — the canonical bridge that extracts to a temp file when the resource isn't already on the filesystem. Validated end-to-end: regular install still reads bytes; ad-hoc zipapp test confirmed `as_file` extracts the resource and returns a real Path. `openai/image.py` (2 errors → 0): the `size` arg to `images.generate` is `Literal[...] | None` in the SDK but our settings field is `str | None`. Mirrored the `groq/tts.py` hint-not-constraint pattern from the previous batch: defined a module-level `OpenAIImageSize = Literal[...]` alias with a comment attributing the upstream symbol and documenting the cast contract (callers can pass any string; invalid values surface as an OpenAI API error). Also guarded `image.data[0]` (response.data is `list[Image] | None`). `processors/frameworks/{langchain,strands_agents}.py` (4 + 4 → 0): both processors do `messages[-1]["content"]` on a value typed `LLMStandardMessage | LLMSpecificMessage` (the latter is a dataclass, not a dict, so `__getitem__` errors). Historically these only handled plain-text user messages, so the fix is two explicit guards (skip if the last message isn't a dict; skip if `content` isn't a string) plus a TODO noting that other shapes (multi-modal content, provider-specific messages) aren't supported yet. langchain's `__get_token_value` also got a small fix where `AIMessageChunk.content` is `str | list[parts]` but the function declares `-> str`; stringify the list case. strands_agents' surfaced two unrelated narrows: a `graph_exit_node: str | None` arg gated by an `__init__`-time assert, and `agent.stream_async` reached only when we're not in graph mode. Files dropped from the ignore list (67 → 57): audio/dtmf/utils.py, audio/filters/rnnoise_filter.py, processors/frameworks/langchain.py, processors/frameworks/strands_agents.py, services/aws/utils.py, services/deepgram/flux/stt.py, services/deepgram/tts.py, services/mistral/stt.py, services/openai/image.py, transports/smallwebrtc/request_handler.py. --- pyrightconfig.json | 10 ------ src/pipecat/audio/dtmf/utils.py | 12 ++++--- src/pipecat/audio/filters/rnnoise_filter.py | 9 ++++-- .../processors/frameworks/langchain.py | 20 ++++++++++-- .../processors/frameworks/strands_agents.py | 18 +++++++++-- src/pipecat/services/aws/utils.py | 12 ++++--- src/pipecat/services/deepgram/flux/stt.py | 13 +++++--- src/pipecat/services/deepgram/tts.py | 14 ++++---- src/pipecat/services/mistral/stt.py | 14 +++++++- src/pipecat/services/openai/image.py | 32 +++++++++++++++++-- src/pipecat/services/tts_service.py | 15 ++++++--- .../transports/smallwebrtc/request_handler.py | 2 ++ 12 files changed, 126 insertions(+), 45 deletions(-) diff --git a/pyrightconfig.json b/pyrightconfig.json index c928acdac..db78dad8c 100644 --- a/pyrightconfig.json +++ b/pyrightconfig.json @@ -6,30 +6,23 @@ "exclude": ["**/*_pb2.py", "**/__pycache__"], "ignore": [ "tests", - "src/pipecat/audio/dtmf/utils.py", "src/pipecat/audio/filters/aic_filter.py", "src/pipecat/audio/filters/krisp_viva_filter.py", - "src/pipecat/audio/filters/rnnoise_filter.py", "src/pipecat/audio/turn/smart_turn/local_smart_turn_v2.py", "src/pipecat/audio/turn/smart_turn/local_smart_turn_v3.py", "src/pipecat/audio/vad/silero.py", "src/pipecat/processors/aggregators/llm_response_universal.py", "src/pipecat/processors/frame_processor.py", - "src/pipecat/processors/frameworks/langchain.py", "src/pipecat/processors/frameworks/rtvi/observer.py", - "src/pipecat/processors/frameworks/strands_agents.py", "src/pipecat/services/anthropic/llm.py", "src/pipecat/services/aws/llm.py", "src/pipecat/services/aws/nova_sonic/llm.py", "src/pipecat/services/aws/sagemaker/bidi_client.py", - "src/pipecat/services/aws/utils.py", "src/pipecat/services/azure/tts.py", "src/pipecat/services/deepgram/flux/base.py", "src/pipecat/services/deepgram/flux/sagemaker/stt.py", - "src/pipecat/services/deepgram/flux/stt.py", "src/pipecat/services/deepgram/sagemaker/stt.py", "src/pipecat/services/deepgram/sagemaker/tts.py", - "src/pipecat/services/deepgram/tts.py", "src/pipecat/services/elevenlabs/tts.py", "src/pipecat/services/google/gemini_live/llm.py", "src/pipecat/services/google/gemini_live/vertex/llm.py", @@ -41,13 +34,11 @@ "src/pipecat/services/inworld/realtime/llm.py", "src/pipecat/services/llm_service.py", "src/pipecat/services/mem0/memory.py", - "src/pipecat/services/mistral/stt.py", "src/pipecat/services/mistral/tts.py", "src/pipecat/services/moondream/vision.py", "src/pipecat/services/nvidia/stt.py", "src/pipecat/services/nvidia/tts.py", "src/pipecat/services/openai/base_llm.py", - "src/pipecat/services/openai/image.py", "src/pipecat/services/openai/realtime/llm.py", "src/pipecat/services/openai/stt.py", "src/pipecat/services/rime/tts.py", @@ -66,7 +57,6 @@ "src/pipecat/transports/lemonslice/transport.py", "src/pipecat/transports/livekit/transport.py", "src/pipecat/transports/smallwebrtc/connection.py", - "src/pipecat/transports/smallwebrtc/request_handler.py", "src/pipecat/transports/smallwebrtc/transport.py", "src/pipecat/transports/tavus/transport.py", "src/pipecat/transports/websocket/client.py", diff --git a/src/pipecat/audio/dtmf/utils.py b/src/pipecat/audio/dtmf/utils.py index 22026759e..dc921ea9b 100644 --- a/src/pipecat/audio/dtmf/utils.py +++ b/src/pipecat/audio/dtmf/utils.py @@ -14,7 +14,7 @@ in-memory after first load to improve performance on subsequent accesses. import asyncio import io import wave -from importlib.resources import files +from importlib.resources import as_file, files import aiofiles @@ -52,10 +52,12 @@ async def load_dtmf_audio(button: KeypadEntry, *, sample_rate: int = 8000) -> by __DTMF_RESAMPLER__ = create_file_resampler() dtmf_file_name = __DTMF_FILE_NAME.get(button, f"dtmf-{button.value}.wav") - dtmf_file_path = files("pipecat.audio.dtmf").joinpath(dtmf_file_name) - - async with aiofiles.open(dtmf_file_path, "rb") as f: - data = await f.read() + # `as_file` materialises the resource as a real filesystem `Path`, + # which aiofiles can open. (For installed packages this is just the + # bundled file; for zipped distributions it would extract to a temp.) + with as_file(files("pipecat.audio.dtmf").joinpath(dtmf_file_name)) as dtmf_file_path: + async with aiofiles.open(dtmf_file_path, "rb") as f: + data = await f.read() with io.BytesIO(data) as buffer: with wave.open(buffer, "rb") as wf: diff --git a/src/pipecat/audio/filters/rnnoise_filter.py b/src/pipecat/audio/filters/rnnoise_filter.py index 8d81d5b0f..1a316e2c5 100644 --- a/src/pipecat/audio/filters/rnnoise_filter.py +++ b/src/pipecat/audio/filters/rnnoise_filter.py @@ -60,7 +60,12 @@ class RNNoiseFilter(BaseAudioFilter): self._sample_rate = sample_rate try: - # RNNoise always requires 48kHz + # The module-level import sets `RNNoise` to `None` if pyrnnoise + # isn't installed; raise instead of calling `None(...)` so the + # except clause handles it cleanly. + if RNNoise is None: + raise ImportError("pyrnnoise is not installed") + # RNNoise always requires 48kHz. self._rnnoise = RNNoise(sample_rate=48000) self._rnnoise_ready = True except Exception as e: @@ -107,7 +112,7 @@ class RNNoiseFilter(BaseAudioFilter): Returns: Noise-suppressed audio data as bytes. """ - if not self._rnnoise_ready or not self._filtering: + if not self._rnnoise_ready or not self._filtering or self._rnnoise is None: return audio # Resample input if needed diff --git a/src/pipecat/processors/frameworks/langchain.py b/src/pipecat/processors/frameworks/langchain.py index 4400327fc..c5d746bb2 100644 --- a/src/pipecat/processors/frameworks/langchain.py +++ b/src/pipecat/processors/frameworks/langchain.py @@ -67,9 +67,20 @@ class LangchainProcessor(FrameProcessor): # The last one by the human is the one we want to send to the LLM. logger.debug(f"Got transcription frame {frame}") messages = frame.context.get_messages() - text: str = messages[-1]["content"] + # Historically this processor has only handled plain-text user + # messages; the guards below make that contract explicit for the + # type checker. TODO: maybe handle other message shapes (provider- + # specific messages, multi-modal content lists, etc.). + last_message = messages[-1] if messages else None + if not isinstance(last_message, dict): + await self.push_frame(frame, direction) + return + content = last_message.get("content") + if not isinstance(content, str): + await self.push_frame(frame, direction) + return - await self._ainvoke(text.strip()) + await self._ainvoke(content.strip()) else: await self.push_frame(frame, direction) @@ -87,7 +98,10 @@ class LangchainProcessor(FrameProcessor): case str(): return text case AIMessageChunk(): - return text.content + # `content` is `str | list[...]` (multi-modal); stringify if + # it's a list, since downstream consumers want plain text. + content = text.content + return content if isinstance(content, str) else str(content) case _: return "" diff --git a/src/pipecat/processors/frameworks/strands_agents.py b/src/pipecat/processors/frameworks/strands_agents.py index 7383cd089..ba028576e 100644 --- a/src/pipecat/processors/frameworks/strands_agents.py +++ b/src/pipecat/processors/frameworks/strands_agents.py @@ -71,9 +71,15 @@ class StrandsAgentsProcessor(FrameProcessor): await super().process_frame(frame, direction) if isinstance(frame, LLMContextFrame): messages = frame.context.get_messages() - if messages: - last_message = messages[-1] - await self._ainvoke(str(last_message["content"]).strip()) + # Historically this processor has only handled plain-text user + # messages; the guards below make that contract explicit for the + # type checker. TODO: handle other message shapes (provider- + # specific messages, multi-modal content lists, etc.). + last_message = messages[-1] if messages else None + if isinstance(last_message, dict): + content = last_message.get("content") + if isinstance(content, str): + await self._ainvoke(content.strip()) else: await self.push_frame(frame, direction) @@ -91,6 +97,9 @@ class StrandsAgentsProcessor(FrameProcessor): await self.start_ttfb_metrics() if self.graph: + # `__init__` asserts `graph_exit_node` is set whenever `graph` + # is, so this can't be None here. + assert self.graph_exit_node is not None # Graph does not stream; await full result then emit assistant text graph_result = await self.graph.invoke_async(text) if ttfb_tracking: @@ -115,6 +124,9 @@ class StrandsAgentsProcessor(FrameProcessor): except Exception as parse_err: logger.warning(f"Failed to extract messages from GraphResult: {parse_err}") else: + # `__init__` asserts at least one of `agent`/`graph` is set, + # and we're in the `not self.graph` branch. + assert self.agent is not None # Agent supports streaming events via async iterator async for event in self.agent.stream_async(text): # Push to TTS service diff --git a/src/pipecat/services/aws/utils.py b/src/pipecat/services/aws/utils.py index 2b69cf035..c14f1f20d 100644 --- a/src/pipecat/services/aws/utils.py +++ b/src/pipecat/services/aws/utils.py @@ -92,14 +92,18 @@ class AWSTranscribePresignedURL: """ def __init__( - self, access_key: str, secret_key: str, session_token: str, region: str = "us-east-1" + self, + access_key: str, + secret_key: str, + session_token: str | None, + region: str = "us-east-1", ): """Initialize the presigned URL generator. Args: access_key: AWS access key ID. secret_key: AWS secret access key. - session_token: AWS session token for temporary credentials. + session_token: AWS session token for temporary credentials (optional). region: AWS region for the service. Defaults to "us-east-1". """ self.access_key = access_key @@ -129,8 +133,8 @@ class AWSTranscribePresignedURL: sample_rate: int, language_code: str = "", media_encoding: str = "pcm", - vocabulary_name: str = "", - vocabulary_filter_name: str = "", + vocabulary_name: str | None = None, + vocabulary_filter_name: str | None = None, show_speaker_label: bool = False, enable_channel_identification: bool = False, number_of_channels: int = 1, diff --git a/src/pipecat/services/deepgram/flux/stt.py b/src/pipecat/services/deepgram/flux/stt.py index 98cf17ec2..f89b37f0b 100644 --- a/src/pipecat/services/deepgram/flux/stt.py +++ b/src/pipecat/services/deepgram/flux/stt.py @@ -299,14 +299,19 @@ class DeepgramFluxSTTService(DeepgramFluxSTTBase, WebsocketService): self._connection_established_event.clear() self._user_is_speaking = False - self._websocket = await websocket_connect( + # `_connect` sets `_websocket_url` before calling us; the assert + # narrows for pyright. + assert self._websocket_url is not None + websocket = await websocket_connect( self._websocket_url, additional_headers={"Authorization": f"Token {self._api_key}"}, ) + self._websocket = websocket - headers = { - k: v for k, v in self._websocket.response.headers.items() if k.startswith("dg-") - } + # `response` is populated after the handshake completes (which it + # has, since `websocket_connect` already returned). + response_headers = websocket.response.headers if websocket.response else {} + headers = {k: v for k, v in response_headers.items() if k.startswith("dg-")} logger.debug(f'{self}: Websocket connection initialized: {{"headers": {headers}}}') # Creating the receiver task diff --git a/src/pipecat/services/deepgram/tts.py b/src/pipecat/services/deepgram/tts.py index 58b0fb81a..c1bb34083 100644 --- a/src/pipecat/services/deepgram/tts.py +++ b/src/pipecat/services/deepgram/tts.py @@ -232,17 +232,19 @@ class DeepgramTTSService(WebsocketTTSService): headers = {"Authorization": f"Token {self._api_key}"} - self._websocket = await websocket_connect(url, additional_headers=headers) + websocket = await websocket_connect(url, additional_headers=headers) + self._websocket = websocket - headers = { - k: v for k, v in self._websocket.response.headers.items() if k.startswith("dg-") - } + # `response` is populated after the handshake completes (which it + # has, since `websocket_connect` already returned). + response_headers = websocket.response.headers if websocket.response else {} + headers = {k: v for k, v in response_headers.items() if k.startswith("dg-")} logger.debug(f'{self}: Websocket connection initialized: {{"headers": {headers}}}') await self._call_event_handler("on_connected") except Exception as e: logger.error(f"{self} exception: {e}") - await self.push_error(ErrorFrame(error=f"{self} error: {e}")) + await self.push_error_frame(ErrorFrame(error=f"{self} error: {e}")) self._websocket = None await self._call_event_handler("on_connection_error", f"{e}") @@ -258,7 +260,7 @@ class DeepgramTTSService(WebsocketTTSService): await self._websocket.close() except Exception as e: logger.error(f"{self} exception: {e}") - await self.push_error(ErrorFrame(error=f"{self} error: {e}")) + await self.push_error_frame(ErrorFrame(error=f"{self} error: {e}")) finally: self._websocket = None await self._call_event_handler("on_disconnected") diff --git a/src/pipecat/services/mistral/stt.py b/src/pipecat/services/mistral/stt.py index bbbeddf04..d2f212663 100644 --- a/src/pipecat/services/mistral/stt.py +++ b/src/pipecat/services/mistral/stt.py @@ -198,6 +198,13 @@ class MistralSTTService(STTService): if not self._connection or self._connection.is_closed: await self._connect() + # `_connect` swallows exceptions and may leave `_connection` unset; + # drop the audio chunk rather than crashing if reconnect failed. + if self._connection is None: + logger.warning(f"{self}: dropping audio chunk — Mistral STT not connected") + yield None + return + await self._connection.send_audio(audio) yield None @@ -248,8 +255,13 @@ class MistralSTTService(STTService): async def _receive_events(self): """Background task: iterate connection events and handle them.""" + # `_connect` started this task only after assigning `_connection`, + # so it should not be None here; bail out defensively just in case. + connection = self._connection + if connection is None: + return try: - async for event in self._connection.events(): + async for event in connection.events(): if isinstance(event, RealtimeTranscriptionSessionCreated): logger.debug(f"{self}: Session created: {event.session}") await self._call_event_handler("on_connected") diff --git a/src/pipecat/services/openai/image.py b/src/pipecat/services/openai/image.py index 45f21a7cb..73dcf0b2b 100644 --- a/src/pipecat/services/openai/image.py +++ b/src/pipecat/services/openai/image.py @@ -13,7 +13,7 @@ for creating images from text prompts. import io from collections.abc import AsyncGenerator from dataclasses import dataclass, field -from typing import Literal +from typing import Literal, cast import aiohttp from loguru import logger @@ -28,6 +28,28 @@ from pipecat.frames.frames import ( from pipecat.services.image_service import ImageGenService from pipecat.services.settings import NOT_GIVEN, ImageGenSettings, _NotGiven, assert_given +# Hint set for the `size` argument to `images.generate`. The values mirror the +# Literal that `openai.resources.images.Images.generate` accepts on its `size` +# parameter (also visible as the `size` field of +# `openai.types.image_generate_params.ImageGenerateParams`). The OpenAI SDK +# does not export this as a named alias, so we redeclare it here. +# +# We cast `_settings.image_size` (a plain `str`) to this Literal at the API +# boundary so callers can still pass any size string (e.g. a newer value the +# SDK accepts before this list is updated). Invalid values surface as an +# OpenAI API error at runtime. Keep in sync on a best-effort basis when +# bumping the openai dep. +OpenAIImageSize = Literal[ + "auto", + "1024x1024", + "1536x1024", + "1024x1536", + "256x256", + "512x512", + "1792x1024", + "1024x1792", +] + @dataclass class OpenAIImageGenSettings(ImageGenSettings): @@ -116,15 +138,19 @@ class OpenAIImageGenService(ImageGenService): """ logger.debug(f"Generating image from prompt: {prompt}") + size = cast(OpenAIImageSize | None, assert_given(self._settings.image_size)) image = await self._client.images.generate( prompt=prompt, model=assert_given(self._settings.model), n=1, - size=assert_given(self._settings.image_size), + size=size, ) - image_url = image.data[0].url + if not image.data: + yield ErrorFrame("Image generation failed: no data returned") + return + image_url = image.data[0].url if not image_url: yield ErrorFrame("Image generation failed") return diff --git a/src/pipecat/services/tts_service.py b/src/pipecat/services/tts_service.py index 5cc4a4b45..2230e9948 100644 --- a/src/pipecat/services/tts_service.py +++ b/src/pipecat/services/tts_service.py @@ -612,8 +612,10 @@ class TTSService(AIService): """Handle the completion of a turn.""" # For HTTP services they emit the frames synchronously, so close the audio context here # once all frames (including TTSTextFrame above) have been enqueued. - if self._is_yielding_frames_synchronously and self.audio_context_available( - self._turn_context_id + if ( + self._is_yielding_frames_synchronously + and self._turn_context_id is not None + and self.audio_context_available(self._turn_context_id) ): if self._push_stop_frames: await self.append_to_audio_context( @@ -1206,12 +1208,17 @@ class TTSService(AIService): else: logger.debug(f"{self} unable to append audio to context {context_id}") - async def remove_audio_context(self, context_id: str): + async def remove_audio_context(self, context_id: str | None): """Remove an existing audio context. Args: - context_id: The context to remove. + context_id: The context to remove. ``None`` is accepted as a + no-op (logged) so callers can pass through values from + ``get_active_audio_context_id()`` without an explicit guard. """ + if not context_id: + logger.debug(f"{self} unable to remove audio context: no context ID provided") + return if self.audio_context_available(context_id): # We just mark the audio context for deletion by appending # None. Once we reach None while handling audio we know we can diff --git a/src/pipecat/transports/smallwebrtc/request_handler.py b/src/pipecat/transports/smallwebrtc/request_handler.py index 63c9bea14..13bf7127c 100644 --- a/src/pipecat/transports/smallwebrtc/request_handler.py +++ b/src/pipecat/transports/smallwebrtc/request_handler.py @@ -224,6 +224,8 @@ class SmallWebRTCRequestHandler: ) answer = pipecat_connection.get_answer() + if answer is None: + raise RuntimeError("SmallWebRTC connection produced no SDP answer") if self._esp32_mode: from pipecat.runner.utils import smallwebrtc_sdp_munging