fix: clear 10 more services from pyright ignore list

A third pass over low-error-count files in the ignore list. Drops 10
files (67 → 57) and full-pyright errors from 555 → 525. Default
pyright stays clean.

Optional access guards (4 files). The same fix shape as 9e9b1f39e:
a receiver typed `X | None` accessed without a guard, fixed with a
local-var capture or an early return.

- `mistral/stt.py`: `_connection.send_audio` could crash if
  `_connect()` swallowed an exception and left `_connection` unset;
  drop the audio chunk with a warning instead. `_receive_events`
  iterating `_connection.events()` got the same defensive narrowing.
- `deepgram/flux/stt.py`: `_websocket_url` is set in `_connect`
  before `_connect_websocket` is called, but pyright doesn't track
  that across methods — assert at the use site. `websocket.response`
  is `Response | None` in the websockets stubs even though it's
  always populated post-handshake; guarded with a fallback.
- `audio/filters/rnnoise_filter.py`: the module-level import sets
  `RNNoise` to `None` if `pyrnnoise` isn't installed; raise
  `ImportError` explicitly instead of relying on the existing try-
  block to catch the `None(...)` call. Also gated `filter()` with
  `or self._rnnoise is None` so pyright sees the narrowing.
- `transports/smallwebrtc/request_handler.py`: `get_answer()`
  legitimately returns `None`; raise instead of crashing on three
  subscript accesses.

`TTSService` `audio-context` API tightening. Mirroring the
`append_to_audio_context` fix from the previous batch:
`remove_audio_context` was typed `str` but is called with `str | None`
from `get_active_audio_context_id()` results. Widened to `str | None`
and the `None` handling lives in the function body (early debug log
+ return) — matching `append_to_audio_context`'s shape.
`audio_context_available` keeps its narrow `str` signature; asking
"is `None` available?" isn't a meaningful question (`_audio_contexts`
is `dict[str, asyncio.Queue]`). The internal call site in
`on_turn_context_completed` narrows `_turn_context_id` explicitly
before passing it. Side effect: deepgram/tts.py's L307 error clears
without local changes.

`deepgram/tts.py` (4 errors → 0): the same `push_error(ErrorFrame(...))`
latent bug we fixed in resembleai earlier in this PR — `push_error`
takes a string; there's a separate `push_error_frame` for frames.
Two sites switched. The Optional `_websocket.response` access is
guarded the same way as deepgram/flux/stt.py. The `remove_audio_context`
error was cleared by the tightening above.

`aws/utils.py` (3 errors → 0): `AWSTranscribePresignedURL` declared
`session_token: str` but the dict source is `str | None` (AWS
supports long-term IAM creds without a session token). Same for
`vocabulary_name`/`vocabulary_filter_name` on `get_request_url`,
which were typed `str = ""` even though the body uses truthy checks
to skip them. Widened to `str | None = None` — matches actual
runtime semantics.

`audio/dtmf/utils.py` (2 errors → 0): `files("...").joinpath(...)`
returns a `Traversable`, but `aiofiles.open` wants a real path. For
regular pip installs this worked in practice (Traversable was a
`Path`), but it would fail for zipped distributions (zipapp,
zipimport) where the resource isn't on disk. Wrapped in
`importlib.resources.as_file(...)` — the canonical bridge that
extracts to a temp file when the resource isn't already on the
filesystem. Validated end-to-end: regular install still reads bytes;
ad-hoc zipapp test confirmed `as_file` extracts the resource and
returns a real Path.

`openai/image.py` (2 errors → 0): the `size` arg to
`images.generate` is `Literal[...] | None` in the SDK but our
settings field is `str | None`. Mirrored the `groq/tts.py`
hint-not-constraint pattern from the previous batch: defined a
module-level `OpenAIImageSize = Literal[...]` alias with a comment
attributing the upstream symbol and documenting the cast contract
(callers can pass any string; invalid values surface as an OpenAI
API error). Also guarded `image.data[0]` (response.data is
`list[Image] | None`).

`processors/frameworks/{langchain,strands_agents}.py` (4 + 4 → 0):
both processors do `messages[-1]["content"]` on a value typed
`LLMStandardMessage | LLMSpecificMessage` (the latter is a dataclass,
not a dict, so `__getitem__` errors). Historically these only
handled plain-text user messages, so the fix is two explicit guards
(skip if the last message isn't a dict; skip if `content` isn't a
string) plus a TODO noting that other shapes (multi-modal content,
provider-specific messages) aren't supported yet. langchain's
`__get_token_value` also got a small fix where `AIMessageChunk.content`
is `str | list[parts]` but the function declares `-> str`; stringify
the list case. strands_agents' surfaced two unrelated narrows: a
`graph_exit_node: str | None` arg gated by an `__init__`-time assert,
and `agent.stream_async` reached only when we're not in graph mode.

Files dropped from the ignore list (67 → 57):
audio/dtmf/utils.py, audio/filters/rnnoise_filter.py,
processors/frameworks/langchain.py,
processors/frameworks/strands_agents.py, services/aws/utils.py,
services/deepgram/flux/stt.py, services/deepgram/tts.py,
services/mistral/stt.py, services/openai/image.py,
transports/smallwebrtc/request_handler.py.
This commit is contained in:
Paul Kompfner
2026-04-29 15:23:18 -04:00
parent 31ff07916f
commit 26a40e2e62
12 changed files with 126 additions and 45 deletions

View File

@@ -6,30 +6,23 @@
"exclude": ["**/*_pb2.py", "**/__pycache__"],
"ignore": [
"tests",
"src/pipecat/audio/dtmf/utils.py",
"src/pipecat/audio/filters/aic_filter.py",
"src/pipecat/audio/filters/krisp_viva_filter.py",
"src/pipecat/audio/filters/rnnoise_filter.py",
"src/pipecat/audio/turn/smart_turn/local_smart_turn_v2.py",
"src/pipecat/audio/turn/smart_turn/local_smart_turn_v3.py",
"src/pipecat/audio/vad/silero.py",
"src/pipecat/processors/aggregators/llm_response_universal.py",
"src/pipecat/processors/frame_processor.py",
"src/pipecat/processors/frameworks/langchain.py",
"src/pipecat/processors/frameworks/rtvi/observer.py",
"src/pipecat/processors/frameworks/strands_agents.py",
"src/pipecat/services/anthropic/llm.py",
"src/pipecat/services/aws/llm.py",
"src/pipecat/services/aws/nova_sonic/llm.py",
"src/pipecat/services/aws/sagemaker/bidi_client.py",
"src/pipecat/services/aws/utils.py",
"src/pipecat/services/azure/tts.py",
"src/pipecat/services/deepgram/flux/base.py",
"src/pipecat/services/deepgram/flux/sagemaker/stt.py",
"src/pipecat/services/deepgram/flux/stt.py",
"src/pipecat/services/deepgram/sagemaker/stt.py",
"src/pipecat/services/deepgram/sagemaker/tts.py",
"src/pipecat/services/deepgram/tts.py",
"src/pipecat/services/elevenlabs/tts.py",
"src/pipecat/services/google/gemini_live/llm.py",
"src/pipecat/services/google/gemini_live/vertex/llm.py",
@@ -41,13 +34,11 @@
"src/pipecat/services/inworld/realtime/llm.py",
"src/pipecat/services/llm_service.py",
"src/pipecat/services/mem0/memory.py",
"src/pipecat/services/mistral/stt.py",
"src/pipecat/services/mistral/tts.py",
"src/pipecat/services/moondream/vision.py",
"src/pipecat/services/nvidia/stt.py",
"src/pipecat/services/nvidia/tts.py",
"src/pipecat/services/openai/base_llm.py",
"src/pipecat/services/openai/image.py",
"src/pipecat/services/openai/realtime/llm.py",
"src/pipecat/services/openai/stt.py",
"src/pipecat/services/rime/tts.py",
@@ -66,7 +57,6 @@
"src/pipecat/transports/lemonslice/transport.py",
"src/pipecat/transports/livekit/transport.py",
"src/pipecat/transports/smallwebrtc/connection.py",
"src/pipecat/transports/smallwebrtc/request_handler.py",
"src/pipecat/transports/smallwebrtc/transport.py",
"src/pipecat/transports/tavus/transport.py",
"src/pipecat/transports/websocket/client.py",

View File

@@ -14,7 +14,7 @@ in-memory after first load to improve performance on subsequent accesses.
import asyncio
import io
import wave
from importlib.resources import files
from importlib.resources import as_file, files
import aiofiles
@@ -52,10 +52,12 @@ async def load_dtmf_audio(button: KeypadEntry, *, sample_rate: int = 8000) -> by
__DTMF_RESAMPLER__ = create_file_resampler()
dtmf_file_name = __DTMF_FILE_NAME.get(button, f"dtmf-{button.value}.wav")
dtmf_file_path = files("pipecat.audio.dtmf").joinpath(dtmf_file_name)
async with aiofiles.open(dtmf_file_path, "rb") as f:
data = await f.read()
# `as_file` materialises the resource as a real filesystem `Path`,
# which aiofiles can open. (For installed packages this is just the
# bundled file; for zipped distributions it would extract to a temp.)
with as_file(files("pipecat.audio.dtmf").joinpath(dtmf_file_name)) as dtmf_file_path:
async with aiofiles.open(dtmf_file_path, "rb") as f:
data = await f.read()
with io.BytesIO(data) as buffer:
with wave.open(buffer, "rb") as wf:

View File

@@ -60,7 +60,12 @@ class RNNoiseFilter(BaseAudioFilter):
self._sample_rate = sample_rate
try:
# RNNoise always requires 48kHz
# The module-level import sets `RNNoise` to `None` if pyrnnoise
# isn't installed; raise instead of calling `None(...)` so the
# except clause handles it cleanly.
if RNNoise is None:
raise ImportError("pyrnnoise is not installed")
# RNNoise always requires 48kHz.
self._rnnoise = RNNoise(sample_rate=48000)
self._rnnoise_ready = True
except Exception as e:
@@ -107,7 +112,7 @@ class RNNoiseFilter(BaseAudioFilter):
Returns:
Noise-suppressed audio data as bytes.
"""
if not self._rnnoise_ready or not self._filtering:
if not self._rnnoise_ready or not self._filtering or self._rnnoise is None:
return audio
# Resample input if needed

View File

@@ -67,9 +67,20 @@ class LangchainProcessor(FrameProcessor):
# The last one by the human is the one we want to send to the LLM.
logger.debug(f"Got transcription frame {frame}")
messages = frame.context.get_messages()
text: str = messages[-1]["content"]
# Historically this processor has only handled plain-text user
# messages; the guards below make that contract explicit for the
# type checker. TODO: maybe handle other message shapes (provider-
# specific messages, multi-modal content lists, etc.).
last_message = messages[-1] if messages else None
if not isinstance(last_message, dict):
await self.push_frame(frame, direction)
return
content = last_message.get("content")
if not isinstance(content, str):
await self.push_frame(frame, direction)
return
await self._ainvoke(text.strip())
await self._ainvoke(content.strip())
else:
await self.push_frame(frame, direction)
@@ -87,7 +98,10 @@ class LangchainProcessor(FrameProcessor):
case str():
return text
case AIMessageChunk():
return text.content
# `content` is `str | list[...]` (multi-modal); stringify if
# it's a list, since downstream consumers want plain text.
content = text.content
return content if isinstance(content, str) else str(content)
case _:
return ""

View File

@@ -71,9 +71,15 @@ class StrandsAgentsProcessor(FrameProcessor):
await super().process_frame(frame, direction)
if isinstance(frame, LLMContextFrame):
messages = frame.context.get_messages()
if messages:
last_message = messages[-1]
await self._ainvoke(str(last_message["content"]).strip())
# Historically this processor has only handled plain-text user
# messages; the guards below make that contract explicit for the
# type checker. TODO: handle other message shapes (provider-
# specific messages, multi-modal content lists, etc.).
last_message = messages[-1] if messages else None
if isinstance(last_message, dict):
content = last_message.get("content")
if isinstance(content, str):
await self._ainvoke(content.strip())
else:
await self.push_frame(frame, direction)
@@ -91,6 +97,9 @@ class StrandsAgentsProcessor(FrameProcessor):
await self.start_ttfb_metrics()
if self.graph:
# `__init__` asserts `graph_exit_node` is set whenever `graph`
# is, so this can't be None here.
assert self.graph_exit_node is not None
# Graph does not stream; await full result then emit assistant text
graph_result = await self.graph.invoke_async(text)
if ttfb_tracking:
@@ -115,6 +124,9 @@ class StrandsAgentsProcessor(FrameProcessor):
except Exception as parse_err:
logger.warning(f"Failed to extract messages from GraphResult: {parse_err}")
else:
# `__init__` asserts at least one of `agent`/`graph` is set,
# and we're in the `not self.graph` branch.
assert self.agent is not None
# Agent supports streaming events via async iterator
async for event in self.agent.stream_async(text):
# Push to TTS service

View File

@@ -92,14 +92,18 @@ class AWSTranscribePresignedURL:
"""
def __init__(
self, access_key: str, secret_key: str, session_token: str, region: str = "us-east-1"
self,
access_key: str,
secret_key: str,
session_token: str | None,
region: str = "us-east-1",
):
"""Initialize the presigned URL generator.
Args:
access_key: AWS access key ID.
secret_key: AWS secret access key.
session_token: AWS session token for temporary credentials.
session_token: AWS session token for temporary credentials (optional).
region: AWS region for the service. Defaults to "us-east-1".
"""
self.access_key = access_key
@@ -129,8 +133,8 @@ class AWSTranscribePresignedURL:
sample_rate: int,
language_code: str = "",
media_encoding: str = "pcm",
vocabulary_name: str = "",
vocabulary_filter_name: str = "",
vocabulary_name: str | None = None,
vocabulary_filter_name: str | None = None,
show_speaker_label: bool = False,
enable_channel_identification: bool = False,
number_of_channels: int = 1,

View File

@@ -299,14 +299,19 @@ class DeepgramFluxSTTService(DeepgramFluxSTTBase, WebsocketService):
self._connection_established_event.clear()
self._user_is_speaking = False
self._websocket = await websocket_connect(
# `_connect` sets `_websocket_url` before calling us; the assert
# narrows for pyright.
assert self._websocket_url is not None
websocket = await websocket_connect(
self._websocket_url,
additional_headers={"Authorization": f"Token {self._api_key}"},
)
self._websocket = websocket
headers = {
k: v for k, v in self._websocket.response.headers.items() if k.startswith("dg-")
}
# `response` is populated after the handshake completes (which it
# has, since `websocket_connect` already returned).
response_headers = websocket.response.headers if websocket.response else {}
headers = {k: v for k, v in response_headers.items() if k.startswith("dg-")}
logger.debug(f'{self}: Websocket connection initialized: {{"headers": {headers}}}')
# Creating the receiver task

View File

@@ -232,17 +232,19 @@ class DeepgramTTSService(WebsocketTTSService):
headers = {"Authorization": f"Token {self._api_key}"}
self._websocket = await websocket_connect(url, additional_headers=headers)
websocket = await websocket_connect(url, additional_headers=headers)
self._websocket = websocket
headers = {
k: v for k, v in self._websocket.response.headers.items() if k.startswith("dg-")
}
# `response` is populated after the handshake completes (which it
# has, since `websocket_connect` already returned).
response_headers = websocket.response.headers if websocket.response else {}
headers = {k: v for k, v in response_headers.items() if k.startswith("dg-")}
logger.debug(f'{self}: Websocket connection initialized: {{"headers": {headers}}}')
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error_frame(ErrorFrame(error=f"{self} error: {e}"))
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
@@ -258,7 +260,7 @@ class DeepgramTTSService(WebsocketTTSService):
await self._websocket.close()
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error_frame(ErrorFrame(error=f"{self} error: {e}"))
finally:
self._websocket = None
await self._call_event_handler("on_disconnected")

View File

@@ -198,6 +198,13 @@ class MistralSTTService(STTService):
if not self._connection or self._connection.is_closed:
await self._connect()
# `_connect` swallows exceptions and may leave `_connection` unset;
# drop the audio chunk rather than crashing if reconnect failed.
if self._connection is None:
logger.warning(f"{self}: dropping audio chunk — Mistral STT not connected")
yield None
return
await self._connection.send_audio(audio)
yield None
@@ -248,8 +255,13 @@ class MistralSTTService(STTService):
async def _receive_events(self):
"""Background task: iterate connection events and handle them."""
# `_connect` started this task only after assigning `_connection`,
# so it should not be None here; bail out defensively just in case.
connection = self._connection
if connection is None:
return
try:
async for event in self._connection.events():
async for event in connection.events():
if isinstance(event, RealtimeTranscriptionSessionCreated):
logger.debug(f"{self}: Session created: {event.session}")
await self._call_event_handler("on_connected")

View File

@@ -13,7 +13,7 @@ for creating images from text prompts.
import io
from collections.abc import AsyncGenerator
from dataclasses import dataclass, field
from typing import Literal
from typing import Literal, cast
import aiohttp
from loguru import logger
@@ -28,6 +28,28 @@ from pipecat.frames.frames import (
from pipecat.services.image_service import ImageGenService
from pipecat.services.settings import NOT_GIVEN, ImageGenSettings, _NotGiven, assert_given
# Hint set for the `size` argument to `images.generate`. The values mirror the
# Literal that `openai.resources.images.Images.generate` accepts on its `size`
# parameter (also visible as the `size` field of
# `openai.types.image_generate_params.ImageGenerateParams`). The OpenAI SDK
# does not export this as a named alias, so we redeclare it here.
#
# We cast `_settings.image_size` (a plain `str`) to this Literal at the API
# boundary so callers can still pass any size string (e.g. a newer value the
# SDK accepts before this list is updated). Invalid values surface as an
# OpenAI API error at runtime. Keep in sync on a best-effort basis when
# bumping the openai dep.
OpenAIImageSize = Literal[
"auto",
"1024x1024",
"1536x1024",
"1024x1536",
"256x256",
"512x512",
"1792x1024",
"1024x1792",
]
@dataclass
class OpenAIImageGenSettings(ImageGenSettings):
@@ -116,15 +138,19 @@ class OpenAIImageGenService(ImageGenService):
"""
logger.debug(f"Generating image from prompt: {prompt}")
size = cast(OpenAIImageSize | None, assert_given(self._settings.image_size))
image = await self._client.images.generate(
prompt=prompt,
model=assert_given(self._settings.model),
n=1,
size=assert_given(self._settings.image_size),
size=size,
)
image_url = image.data[0].url
if not image.data:
yield ErrorFrame("Image generation failed: no data returned")
return
image_url = image.data[0].url
if not image_url:
yield ErrorFrame("Image generation failed")
return

View File

@@ -612,8 +612,10 @@ class TTSService(AIService):
"""Handle the completion of a turn."""
# For HTTP services they emit the frames synchronously, so close the audio context here
# once all frames (including TTSTextFrame above) have been enqueued.
if self._is_yielding_frames_synchronously and self.audio_context_available(
self._turn_context_id
if (
self._is_yielding_frames_synchronously
and self._turn_context_id is not None
and self.audio_context_available(self._turn_context_id)
):
if self._push_stop_frames:
await self.append_to_audio_context(
@@ -1206,12 +1208,17 @@ class TTSService(AIService):
else:
logger.debug(f"{self} unable to append audio to context {context_id}")
async def remove_audio_context(self, context_id: str):
async def remove_audio_context(self, context_id: str | None):
"""Remove an existing audio context.
Args:
context_id: The context to remove.
context_id: The context to remove. ``None`` is accepted as a
no-op (logged) so callers can pass through values from
``get_active_audio_context_id()`` without an explicit guard.
"""
if not context_id:
logger.debug(f"{self} unable to remove audio context: no context ID provided")
return
if self.audio_context_available(context_id):
# We just mark the audio context for deletion by appending
# None. Once we reach None while handling audio we know we can

View File

@@ -224,6 +224,8 @@ class SmallWebRTCRequestHandler:
)
answer = pipecat_connection.get_answer()
if answer is None:
raise RuntimeError("SmallWebRTC connection produced no SDP answer")
if self._esp32_mode:
from pipecat.runner.utils import smallwebrtc_sdp_munging