diff --git a/pyrightconfig.json b/pyrightconfig.json index c928acdac..db78dad8c 100644 --- a/pyrightconfig.json +++ b/pyrightconfig.json @@ -6,30 +6,23 @@ "exclude": ["**/*_pb2.py", "**/__pycache__"], "ignore": [ "tests", - "src/pipecat/audio/dtmf/utils.py", "src/pipecat/audio/filters/aic_filter.py", "src/pipecat/audio/filters/krisp_viva_filter.py", - "src/pipecat/audio/filters/rnnoise_filter.py", "src/pipecat/audio/turn/smart_turn/local_smart_turn_v2.py", "src/pipecat/audio/turn/smart_turn/local_smart_turn_v3.py", "src/pipecat/audio/vad/silero.py", "src/pipecat/processors/aggregators/llm_response_universal.py", "src/pipecat/processors/frame_processor.py", - "src/pipecat/processors/frameworks/langchain.py", "src/pipecat/processors/frameworks/rtvi/observer.py", - "src/pipecat/processors/frameworks/strands_agents.py", "src/pipecat/services/anthropic/llm.py", "src/pipecat/services/aws/llm.py", "src/pipecat/services/aws/nova_sonic/llm.py", "src/pipecat/services/aws/sagemaker/bidi_client.py", - "src/pipecat/services/aws/utils.py", "src/pipecat/services/azure/tts.py", "src/pipecat/services/deepgram/flux/base.py", "src/pipecat/services/deepgram/flux/sagemaker/stt.py", - "src/pipecat/services/deepgram/flux/stt.py", "src/pipecat/services/deepgram/sagemaker/stt.py", "src/pipecat/services/deepgram/sagemaker/tts.py", - "src/pipecat/services/deepgram/tts.py", "src/pipecat/services/elevenlabs/tts.py", "src/pipecat/services/google/gemini_live/llm.py", "src/pipecat/services/google/gemini_live/vertex/llm.py", @@ -41,13 +34,11 @@ "src/pipecat/services/inworld/realtime/llm.py", "src/pipecat/services/llm_service.py", "src/pipecat/services/mem0/memory.py", - "src/pipecat/services/mistral/stt.py", "src/pipecat/services/mistral/tts.py", "src/pipecat/services/moondream/vision.py", "src/pipecat/services/nvidia/stt.py", "src/pipecat/services/nvidia/tts.py", "src/pipecat/services/openai/base_llm.py", - "src/pipecat/services/openai/image.py", "src/pipecat/services/openai/realtime/llm.py", "src/pipecat/services/openai/stt.py", "src/pipecat/services/rime/tts.py", @@ -66,7 +57,6 @@ "src/pipecat/transports/lemonslice/transport.py", "src/pipecat/transports/livekit/transport.py", "src/pipecat/transports/smallwebrtc/connection.py", - "src/pipecat/transports/smallwebrtc/request_handler.py", "src/pipecat/transports/smallwebrtc/transport.py", "src/pipecat/transports/tavus/transport.py", "src/pipecat/transports/websocket/client.py", diff --git a/src/pipecat/audio/dtmf/utils.py b/src/pipecat/audio/dtmf/utils.py index 22026759e..dc921ea9b 100644 --- a/src/pipecat/audio/dtmf/utils.py +++ b/src/pipecat/audio/dtmf/utils.py @@ -14,7 +14,7 @@ in-memory after first load to improve performance on subsequent accesses. import asyncio import io import wave -from importlib.resources import files +from importlib.resources import as_file, files import aiofiles @@ -52,10 +52,12 @@ async def load_dtmf_audio(button: KeypadEntry, *, sample_rate: int = 8000) -> by __DTMF_RESAMPLER__ = create_file_resampler() dtmf_file_name = __DTMF_FILE_NAME.get(button, f"dtmf-{button.value}.wav") - dtmf_file_path = files("pipecat.audio.dtmf").joinpath(dtmf_file_name) - - async with aiofiles.open(dtmf_file_path, "rb") as f: - data = await f.read() + # `as_file` materialises the resource as a real filesystem `Path`, + # which aiofiles can open. (For installed packages this is just the + # bundled file; for zipped distributions it would extract to a temp.) + with as_file(files("pipecat.audio.dtmf").joinpath(dtmf_file_name)) as dtmf_file_path: + async with aiofiles.open(dtmf_file_path, "rb") as f: + data = await f.read() with io.BytesIO(data) as buffer: with wave.open(buffer, "rb") as wf: diff --git a/src/pipecat/audio/filters/rnnoise_filter.py b/src/pipecat/audio/filters/rnnoise_filter.py index 8d81d5b0f..1a316e2c5 100644 --- a/src/pipecat/audio/filters/rnnoise_filter.py +++ b/src/pipecat/audio/filters/rnnoise_filter.py @@ -60,7 +60,12 @@ class RNNoiseFilter(BaseAudioFilter): self._sample_rate = sample_rate try: - # RNNoise always requires 48kHz + # The module-level import sets `RNNoise` to `None` if pyrnnoise + # isn't installed; raise instead of calling `None(...)` so the + # except clause handles it cleanly. + if RNNoise is None: + raise ImportError("pyrnnoise is not installed") + # RNNoise always requires 48kHz. self._rnnoise = RNNoise(sample_rate=48000) self._rnnoise_ready = True except Exception as e: @@ -107,7 +112,7 @@ class RNNoiseFilter(BaseAudioFilter): Returns: Noise-suppressed audio data as bytes. """ - if not self._rnnoise_ready or not self._filtering: + if not self._rnnoise_ready or not self._filtering or self._rnnoise is None: return audio # Resample input if needed diff --git a/src/pipecat/processors/frameworks/langchain.py b/src/pipecat/processors/frameworks/langchain.py index 4400327fc..c5d746bb2 100644 --- a/src/pipecat/processors/frameworks/langchain.py +++ b/src/pipecat/processors/frameworks/langchain.py @@ -67,9 +67,20 @@ class LangchainProcessor(FrameProcessor): # The last one by the human is the one we want to send to the LLM. logger.debug(f"Got transcription frame {frame}") messages = frame.context.get_messages() - text: str = messages[-1]["content"] + # Historically this processor has only handled plain-text user + # messages; the guards below make that contract explicit for the + # type checker. TODO: maybe handle other message shapes (provider- + # specific messages, multi-modal content lists, etc.). + last_message = messages[-1] if messages else None + if not isinstance(last_message, dict): + await self.push_frame(frame, direction) + return + content = last_message.get("content") + if not isinstance(content, str): + await self.push_frame(frame, direction) + return - await self._ainvoke(text.strip()) + await self._ainvoke(content.strip()) else: await self.push_frame(frame, direction) @@ -87,7 +98,10 @@ class LangchainProcessor(FrameProcessor): case str(): return text case AIMessageChunk(): - return text.content + # `content` is `str | list[...]` (multi-modal); stringify if + # it's a list, since downstream consumers want plain text. + content = text.content + return content if isinstance(content, str) else str(content) case _: return "" diff --git a/src/pipecat/processors/frameworks/strands_agents.py b/src/pipecat/processors/frameworks/strands_agents.py index 7383cd089..ba028576e 100644 --- a/src/pipecat/processors/frameworks/strands_agents.py +++ b/src/pipecat/processors/frameworks/strands_agents.py @@ -71,9 +71,15 @@ class StrandsAgentsProcessor(FrameProcessor): await super().process_frame(frame, direction) if isinstance(frame, LLMContextFrame): messages = frame.context.get_messages() - if messages: - last_message = messages[-1] - await self._ainvoke(str(last_message["content"]).strip()) + # Historically this processor has only handled plain-text user + # messages; the guards below make that contract explicit for the + # type checker. TODO: handle other message shapes (provider- + # specific messages, multi-modal content lists, etc.). + last_message = messages[-1] if messages else None + if isinstance(last_message, dict): + content = last_message.get("content") + if isinstance(content, str): + await self._ainvoke(content.strip()) else: await self.push_frame(frame, direction) @@ -91,6 +97,9 @@ class StrandsAgentsProcessor(FrameProcessor): await self.start_ttfb_metrics() if self.graph: + # `__init__` asserts `graph_exit_node` is set whenever `graph` + # is, so this can't be None here. + assert self.graph_exit_node is not None # Graph does not stream; await full result then emit assistant text graph_result = await self.graph.invoke_async(text) if ttfb_tracking: @@ -115,6 +124,9 @@ class StrandsAgentsProcessor(FrameProcessor): except Exception as parse_err: logger.warning(f"Failed to extract messages from GraphResult: {parse_err}") else: + # `__init__` asserts at least one of `agent`/`graph` is set, + # and we're in the `not self.graph` branch. + assert self.agent is not None # Agent supports streaming events via async iterator async for event in self.agent.stream_async(text): # Push to TTS service diff --git a/src/pipecat/services/aws/utils.py b/src/pipecat/services/aws/utils.py index 2b69cf035..c14f1f20d 100644 --- a/src/pipecat/services/aws/utils.py +++ b/src/pipecat/services/aws/utils.py @@ -92,14 +92,18 @@ class AWSTranscribePresignedURL: """ def __init__( - self, access_key: str, secret_key: str, session_token: str, region: str = "us-east-1" + self, + access_key: str, + secret_key: str, + session_token: str | None, + region: str = "us-east-1", ): """Initialize the presigned URL generator. Args: access_key: AWS access key ID. secret_key: AWS secret access key. - session_token: AWS session token for temporary credentials. + session_token: AWS session token for temporary credentials (optional). region: AWS region for the service. Defaults to "us-east-1". """ self.access_key = access_key @@ -129,8 +133,8 @@ class AWSTranscribePresignedURL: sample_rate: int, language_code: str = "", media_encoding: str = "pcm", - vocabulary_name: str = "", - vocabulary_filter_name: str = "", + vocabulary_name: str | None = None, + vocabulary_filter_name: str | None = None, show_speaker_label: bool = False, enable_channel_identification: bool = False, number_of_channels: int = 1, diff --git a/src/pipecat/services/deepgram/flux/stt.py b/src/pipecat/services/deepgram/flux/stt.py index 98cf17ec2..f89b37f0b 100644 --- a/src/pipecat/services/deepgram/flux/stt.py +++ b/src/pipecat/services/deepgram/flux/stt.py @@ -299,14 +299,19 @@ class DeepgramFluxSTTService(DeepgramFluxSTTBase, WebsocketService): self._connection_established_event.clear() self._user_is_speaking = False - self._websocket = await websocket_connect( + # `_connect` sets `_websocket_url` before calling us; the assert + # narrows for pyright. + assert self._websocket_url is not None + websocket = await websocket_connect( self._websocket_url, additional_headers={"Authorization": f"Token {self._api_key}"}, ) + self._websocket = websocket - headers = { - k: v for k, v in self._websocket.response.headers.items() if k.startswith("dg-") - } + # `response` is populated after the handshake completes (which it + # has, since `websocket_connect` already returned). + response_headers = websocket.response.headers if websocket.response else {} + headers = {k: v for k, v in response_headers.items() if k.startswith("dg-")} logger.debug(f'{self}: Websocket connection initialized: {{"headers": {headers}}}') # Creating the receiver task diff --git a/src/pipecat/services/deepgram/tts.py b/src/pipecat/services/deepgram/tts.py index 58b0fb81a..c1bb34083 100644 --- a/src/pipecat/services/deepgram/tts.py +++ b/src/pipecat/services/deepgram/tts.py @@ -232,17 +232,19 @@ class DeepgramTTSService(WebsocketTTSService): headers = {"Authorization": f"Token {self._api_key}"} - self._websocket = await websocket_connect(url, additional_headers=headers) + websocket = await websocket_connect(url, additional_headers=headers) + self._websocket = websocket - headers = { - k: v for k, v in self._websocket.response.headers.items() if k.startswith("dg-") - } + # `response` is populated after the handshake completes (which it + # has, since `websocket_connect` already returned). + response_headers = websocket.response.headers if websocket.response else {} + headers = {k: v for k, v in response_headers.items() if k.startswith("dg-")} logger.debug(f'{self}: Websocket connection initialized: {{"headers": {headers}}}') await self._call_event_handler("on_connected") except Exception as e: logger.error(f"{self} exception: {e}") - await self.push_error(ErrorFrame(error=f"{self} error: {e}")) + await self.push_error_frame(ErrorFrame(error=f"{self} error: {e}")) self._websocket = None await self._call_event_handler("on_connection_error", f"{e}") @@ -258,7 +260,7 @@ class DeepgramTTSService(WebsocketTTSService): await self._websocket.close() except Exception as e: logger.error(f"{self} exception: {e}") - await self.push_error(ErrorFrame(error=f"{self} error: {e}")) + await self.push_error_frame(ErrorFrame(error=f"{self} error: {e}")) finally: self._websocket = None await self._call_event_handler("on_disconnected") diff --git a/src/pipecat/services/mistral/stt.py b/src/pipecat/services/mistral/stt.py index bbbeddf04..d2f212663 100644 --- a/src/pipecat/services/mistral/stt.py +++ b/src/pipecat/services/mistral/stt.py @@ -198,6 +198,13 @@ class MistralSTTService(STTService): if not self._connection or self._connection.is_closed: await self._connect() + # `_connect` swallows exceptions and may leave `_connection` unset; + # drop the audio chunk rather than crashing if reconnect failed. + if self._connection is None: + logger.warning(f"{self}: dropping audio chunk — Mistral STT not connected") + yield None + return + await self._connection.send_audio(audio) yield None @@ -248,8 +255,13 @@ class MistralSTTService(STTService): async def _receive_events(self): """Background task: iterate connection events and handle them.""" + # `_connect` started this task only after assigning `_connection`, + # so it should not be None here; bail out defensively just in case. + connection = self._connection + if connection is None: + return try: - async for event in self._connection.events(): + async for event in connection.events(): if isinstance(event, RealtimeTranscriptionSessionCreated): logger.debug(f"{self}: Session created: {event.session}") await self._call_event_handler("on_connected") diff --git a/src/pipecat/services/openai/image.py b/src/pipecat/services/openai/image.py index 45f21a7cb..73dcf0b2b 100644 --- a/src/pipecat/services/openai/image.py +++ b/src/pipecat/services/openai/image.py @@ -13,7 +13,7 @@ for creating images from text prompts. import io from collections.abc import AsyncGenerator from dataclasses import dataclass, field -from typing import Literal +from typing import Literal, cast import aiohttp from loguru import logger @@ -28,6 +28,28 @@ from pipecat.frames.frames import ( from pipecat.services.image_service import ImageGenService from pipecat.services.settings import NOT_GIVEN, ImageGenSettings, _NotGiven, assert_given +# Hint set for the `size` argument to `images.generate`. The values mirror the +# Literal that `openai.resources.images.Images.generate` accepts on its `size` +# parameter (also visible as the `size` field of +# `openai.types.image_generate_params.ImageGenerateParams`). The OpenAI SDK +# does not export this as a named alias, so we redeclare it here. +# +# We cast `_settings.image_size` (a plain `str`) to this Literal at the API +# boundary so callers can still pass any size string (e.g. a newer value the +# SDK accepts before this list is updated). Invalid values surface as an +# OpenAI API error at runtime. Keep in sync on a best-effort basis when +# bumping the openai dep. +OpenAIImageSize = Literal[ + "auto", + "1024x1024", + "1536x1024", + "1024x1536", + "256x256", + "512x512", + "1792x1024", + "1024x1792", +] + @dataclass class OpenAIImageGenSettings(ImageGenSettings): @@ -116,15 +138,19 @@ class OpenAIImageGenService(ImageGenService): """ logger.debug(f"Generating image from prompt: {prompt}") + size = cast(OpenAIImageSize | None, assert_given(self._settings.image_size)) image = await self._client.images.generate( prompt=prompt, model=assert_given(self._settings.model), n=1, - size=assert_given(self._settings.image_size), + size=size, ) - image_url = image.data[0].url + if not image.data: + yield ErrorFrame("Image generation failed: no data returned") + return + image_url = image.data[0].url if not image_url: yield ErrorFrame("Image generation failed") return diff --git a/src/pipecat/services/tts_service.py b/src/pipecat/services/tts_service.py index 5cc4a4b45..2230e9948 100644 --- a/src/pipecat/services/tts_service.py +++ b/src/pipecat/services/tts_service.py @@ -612,8 +612,10 @@ class TTSService(AIService): """Handle the completion of a turn.""" # For HTTP services they emit the frames synchronously, so close the audio context here # once all frames (including TTSTextFrame above) have been enqueued. - if self._is_yielding_frames_synchronously and self.audio_context_available( - self._turn_context_id + if ( + self._is_yielding_frames_synchronously + and self._turn_context_id is not None + and self.audio_context_available(self._turn_context_id) ): if self._push_stop_frames: await self.append_to_audio_context( @@ -1206,12 +1208,17 @@ class TTSService(AIService): else: logger.debug(f"{self} unable to append audio to context {context_id}") - async def remove_audio_context(self, context_id: str): + async def remove_audio_context(self, context_id: str | None): """Remove an existing audio context. Args: - context_id: The context to remove. + context_id: The context to remove. ``None`` is accepted as a + no-op (logged) so callers can pass through values from + ``get_active_audio_context_id()`` without an explicit guard. """ + if not context_id: + logger.debug(f"{self} unable to remove audio context: no context ID provided") + return if self.audio_context_available(context_id): # We just mark the audio context for deletion by appending # None. Once we reach None while handling audio we know we can diff --git a/src/pipecat/transports/smallwebrtc/request_handler.py b/src/pipecat/transports/smallwebrtc/request_handler.py index 63c9bea14..13bf7127c 100644 --- a/src/pipecat/transports/smallwebrtc/request_handler.py +++ b/src/pipecat/transports/smallwebrtc/request_handler.py @@ -224,6 +224,8 @@ class SmallWebRTCRequestHandler: ) answer = pipecat_connection.get_answer() + if answer is None: + raise RuntimeError("SmallWebRTC connection produced no SDP answer") if self._esp32_mode: from pipecat.runner.utils import smallwebrtc_sdp_munging