diff --git a/pyrightconfig.json b/pyrightconfig.json index db78dad8c..528229034 100644 --- a/pyrightconfig.json +++ b/pyrightconfig.json @@ -19,13 +19,10 @@ "src/pipecat/services/aws/nova_sonic/llm.py", "src/pipecat/services/aws/sagemaker/bidi_client.py", "src/pipecat/services/azure/tts.py", - "src/pipecat/services/deepgram/flux/base.py", "src/pipecat/services/deepgram/flux/sagemaker/stt.py", "src/pipecat/services/deepgram/sagemaker/stt.py", "src/pipecat/services/deepgram/sagemaker/tts.py", - "src/pipecat/services/elevenlabs/tts.py", "src/pipecat/services/google/gemini_live/llm.py", - "src/pipecat/services/google/gemini_live/vertex/llm.py", "src/pipecat/services/google/llm.py", "src/pipecat/services/google/stt.py", "src/pipecat/services/google/tts.py", @@ -35,12 +32,10 @@ "src/pipecat/services/llm_service.py", "src/pipecat/services/mem0/memory.py", "src/pipecat/services/mistral/tts.py", - "src/pipecat/services/moondream/vision.py", "src/pipecat/services/nvidia/stt.py", "src/pipecat/services/nvidia/tts.py", "src/pipecat/services/openai/base_llm.py", "src/pipecat/services/openai/realtime/llm.py", - "src/pipecat/services/openai/stt.py", "src/pipecat/services/rime/tts.py", "src/pipecat/services/sambanova/llm.py", "src/pipecat/services/sarvam/stt.py", @@ -52,16 +47,13 @@ "src/pipecat/services/ultravox/llm.py", "src/pipecat/services/whisper/stt.py", "src/pipecat/services/xai/realtime/llm.py", - "src/pipecat/transports/base_output.py", "src/pipecat/transports/daily/transport.py", "src/pipecat/transports/lemonslice/transport.py", "src/pipecat/transports/livekit/transport.py", "src/pipecat/transports/smallwebrtc/connection.py", "src/pipecat/transports/smallwebrtc/transport.py", "src/pipecat/transports/tavus/transport.py", - "src/pipecat/transports/websocket/client.py", - "src/pipecat/transports/websocket/server.py", - "src/pipecat/transports/whatsapp/client.py" + "src/pipecat/transports/websocket/server.py" ], "reportMissingImports": false } diff --git a/src/pipecat/services/deepgram/flux/base.py b/src/pipecat/services/deepgram/flux/base.py index 173da5dc7..d2a8a24cd 100644 --- a/src/pipecat/services/deepgram/flux/base.py +++ b/src/pipecat/services/deepgram/flux/base.py @@ -536,6 +536,10 @@ class DeepgramFluxSTTBase(STTService): event = data.get("event") transcript = data.get("transcript", "") + if not isinstance(event, str): + logger.debug(f"Unhandled TurnInfo event (not a string): {event}") + return + try: flux_event_type = FluxEventType(event) except ValueError: @@ -648,7 +652,11 @@ class DeepgramFluxSTTBase(STTService): detected_language = self._primary_detected_language(data) min_confidence = assert_given(self._settings.min_confidence) - if not min_confidence or average_confidence > min_confidence: + # No threshold (None or 0.0) → accept. Otherwise require confidence + # data and compare; drop if data is missing. + if not min_confidence or ( + average_confidence is not None and average_confidence > min_confidence + ): # EndOfTurn means Flux has determined the turn is complete, # so this TranscriptionFrame is always finalized await self.push_frame( diff --git a/src/pipecat/services/elevenlabs/tts.py b/src/pipecat/services/elevenlabs/tts.py index 239e6a64e..fe8e7ab84 100644 --- a/src/pipecat/services/elevenlabs/tts.py +++ b/src/pipecat/services/elevenlabs/tts.py @@ -923,7 +923,7 @@ class ElevenLabsTTSService(WebsocketTTSService): self._partial_word_start_time = 0.0 # Initialize context with voice settings and pronunciation dictionaries - msg = {"text": " ", "context_id": context_id} + msg: dict[str, Any] = {"text": " ", "context_id": context_id} if self._voice_settings: msg["voice_settings"] = self._voice_settings if self._pronunciation_dictionary_locators: @@ -1268,7 +1268,7 @@ class ElevenLabsHttpTTSService(TTSService): url = f"{self._base_url}/v1/text-to-speech/{self._settings.voice}/stream/with-timestamps" model_id = assert_given(self._settings.model) - payload: dict[str, str | dict[str, float | bool]] = { + payload: dict[str, Any] = { "text": text, "model_id": model_id, } diff --git a/src/pipecat/services/google/gemini_live/vertex/llm.py b/src/pipecat/services/google/gemini_live/vertex/llm.py index 44ded852f..b02c18a60 100644 --- a/src/pipecat/services/google/gemini_live/vertex/llm.py +++ b/src/pipecat/services/google/gemini_live/vertex/llm.py @@ -174,11 +174,17 @@ class GeminiLiveVertexLLMService(GeminiLiveLLMService): default_settings.temperature = params.temperature default_settings.top_k = params.top_k default_settings.top_p = params.top_p - default_settings.modalities = params.modalities + # `params.modalities` and `params.media_resolution` are typed + # ` | None` on the deprecated InputParams, but None isn't + # a valid setting value (downstream uses call `.value` on + # them). Fall back to the canonical defaults. + default_settings.modalities = params.modalities or GeminiModalities.AUDIO default_settings.language = ( language_to_gemini_language(params.language) if params.language else "en-US" ) - default_settings.media_resolution = params.media_resolution + default_settings.media_resolution = ( + params.media_resolution or GeminiMediaResolution.UNSPECIFIED + ) default_settings.vad = params.vad default_settings.context_window_compression = ( params.context_window_compression.model_dump() @@ -233,7 +239,9 @@ class GeminiLiveVertexLLMService(GeminiLiveLLMService): ) @staticmethod - def _get_credentials(credentials: str | None, credentials_path: str | None) -> str: + def _get_credentials( + credentials: str | None, credentials_path: str | None + ) -> service_account.Credentials: """Retrieve Credentials using Google service account credentials JSON. Supports multiple authentication methods: @@ -246,7 +254,8 @@ class GeminiLiveVertexLLMService(GeminiLiveLLMService): credentials_path: Path to the service account JSON file. Returns: - OAuth token for API authentication. + A service-account ``Credentials`` object suitable for the Vertex + AI client (with its access token refreshed). Raises: ValueError: If no valid credentials are provided or found. diff --git a/src/pipecat/services/moondream/vision.py b/src/pipecat/services/moondream/vision.py index 01482df4b..49cb0ba34 100644 --- a/src/pipecat/services/moondream/vision.py +++ b/src/pipecat/services/moondream/vision.py @@ -153,9 +153,14 @@ class MoondreamService(VisionService): logger.debug(f"Analyzing image (bytes length: {len(frame.image)})") def get_image_description(image_bytes: bytes, text: str | None) -> str: + if frame.format is None: + raise ValueError("Cannot decode image bytes without a format") image = Image.frombytes(frame.format, frame.size, image_bytes) - image_embeds = self._model.encode_image(image) - description = self._model.query(image_embeds, text)["answer"] + # `encode_image` and `query` are custom methods provided by the + # moondream2 model code (via `trust_remote_code=True`) that pyright + # can't see on `AutoModelForCausalLM`'s base type. + image_embeds = self._model.encode_image(image) # pyright: ignore[reportCallIssue] + description = self._model.query(image_embeds, text)["answer"] # pyright: ignore[reportCallIssue] return description description = await asyncio.to_thread(get_image_description, frame.image, frame.text) diff --git a/src/pipecat/services/openai/stt.py b/src/pipecat/services/openai/stt.py index b7dddf441..6f635639a 100644 --- a/src/pipecat/services/openai/stt.py +++ b/src/pipecat/services/openai/stt.py @@ -18,7 +18,7 @@ import base64 import json from collections.abc import AsyncGenerator from dataclasses import dataclass, field -from typing import Any, Literal +from typing import Any, Literal, cast from loguru import logger @@ -475,6 +475,9 @@ class OpenAIRealtimeSTTService(WebsocketSTTService): async def _connect_websocket(self): """Establish the WebSocket connection to the transcription endpoint.""" try: + # `__init__` raises if websockets isn't installed, so these symbols + # are non-None by the time any method runs. + assert websocket_connect is not None and State is not None if self._websocket and self._websocket.state is State.OPEN: return @@ -534,7 +537,9 @@ class OpenAIRealtimeSTTService(WebsocketSTTService): """Send ``session.update`` to configure the transcription session.""" transcription: dict = {"model": self._settings.model} - language = assert_given(self._settings.language) + # Technically `_settings.language` could be a raw string, but Language + # is a StrEnum so downstream handles either. + language = cast("Language | None", assert_given(self._settings.language)) language_code = self._language_to_code(language) if language else None if language_code: transcription["language"] = language_code @@ -611,6 +616,10 @@ class OpenAIRealtimeSTTService(WebsocketSTTService): Called by ``WebsocketService._receive_task_handler`` which wraps this method with automatic reconnection on connection errors. """ + # `_connect` only starts the receive task after `_websocket` is set, + # and reconnects re-establish it before the next iteration, so this + # invariant should always hold when this method runs. + assert self._websocket is not None async for message in self._websocket: try: evt = json.loads(message) diff --git a/src/pipecat/transports/base_output.py b/src/pipecat/transports/base_output.py index d485129f4..3f1427627 100644 --- a/src/pipecat/transports/base_output.py +++ b/src/pipecat/transports/base_output.py @@ -771,13 +771,16 @@ class BaseOutputTransport(FrameProcessor): await self._bot_stopped_speaking() async def with_mixer(vad_stop_secs: float) -> AsyncGenerator[Frame, None]: + # Caller below only invokes this when `self._mixer` is set. + mixer = self._mixer + assert mixer is not None last_frame_time = 0 silence = b"\x00" * self._audio_chunk_size while True: try: frame = self._audio_queue.get_nowait() if isinstance(frame, OutputAudioRawFrame): - frame.audio = await self._mixer.mix(frame.audio) + frame.audio = await mixer.mix(frame.audio) last_frame_time = time.time() yield frame self._audio_queue.task_done() @@ -788,7 +791,7 @@ class BaseOutputTransport(FrameProcessor): await self._bot_stopped_speaking() # Generate an audio frame with only the mixer's part. frame = OutputAudioRawFrame( - audio=await self._mixer.mix(silence), + audio=await mixer.mix(silence), sample_rate=self._sample_rate, num_channels=self._params.audio_out_channels, ) @@ -927,6 +930,11 @@ class BaseOutputTransport(FrameProcessor): """ def resize_frame(frame: OutputImageRawFrame) -> OutputImageRawFrame: + # Without a format we can't decode the bytes, so leave the + # frame as-is and let the transport pass it through unchanged. + if frame.format is None: + return frame + desired_size = (self._params.video_out_width, self._params.video_out_height) # TODO: we should refactor in the future to support dynamic resolutions diff --git a/src/pipecat/transports/websocket/client.py b/src/pipecat/transports/websocket/client.py index 5665dfd23..9f2a43dbc 100644 --- a/src/pipecat/transports/websocket/client.py +++ b/src/pipecat/transports/websocket/client.py @@ -64,9 +64,18 @@ class WebsocketClientCallbacks(BaseModel): on_message: Called when a message is received from the WebSocket. """ - on_connected: Callable[[websockets.WebSocketClientProtocol], Awaitable[None]] - on_disconnected: Callable[[websockets.WebSocketClientProtocol], Awaitable[None]] - on_message: Callable[[websockets.WebSocketClientProtocol, websockets.Data], Awaitable[None]] + on_connected: Callable[ + [websockets.WebSocketClientProtocol], # pyright: ignore[reportAttributeAccessIssue] + Awaitable[None], + ] + on_disconnected: Callable[ + [websockets.WebSocketClientProtocol], # pyright: ignore[reportAttributeAccessIssue] + Awaitable[None], + ] + on_message: Callable[ + [websockets.WebSocketClientProtocol, websockets.Data], # pyright: ignore[reportAttributeAccessIssue] + Awaitable[None], + ] class WebsocketClientSession: @@ -98,7 +107,7 @@ class WebsocketClientSession: self._leave_counter = 0 self._task_manager: BaseTaskManager | None = None - self._websocket: websockets.WebSocketClientProtocol | None = None + self._websocket: websockets.WebSocketClientProtocol | None = None # pyright: ignore[reportAttributeAccessIssue] @property def task_manager(self) -> BaseTaskManager: @@ -192,6 +201,10 @@ class WebsocketClientSession: async def _client_task_handler(self): """Handle incoming messages from the WebSocket connection.""" + # `connect()` only starts this task after `_websocket` is assigned, and + # `disconnect()` cancels the task before clearing `_websocket`, so this + # invariant should always hold when this method runs. + assert self._websocket is not None try: # Handle incoming messages async for message in self._websocket: diff --git a/src/pipecat/transports/whatsapp/client.py b/src/pipecat/transports/whatsapp/client.py index 8f479520f..171a79247 100644 --- a/src/pipecat/transports/whatsapp/client.py +++ b/src/pipecat/transports/whatsapp/client.py @@ -154,8 +154,17 @@ class WhatsAppClient: return int(challenge) - async def _validate_whatsapp_webhook_request(self, raw_body: bytes, sha256_signature: str): + async def _validate_whatsapp_webhook_request( + self, raw_body: bytes | None, sha256_signature: str | None + ): """Common handler for both /start and /connect endpoints.""" + # Callers gate on `self._whatsapp_secret`, so the assert holds. + assert self._whatsapp_secret is not None + if raw_body is None: + raise Exception("Missing raw request body") + if not sha256_signature: + raise Exception("Missing X-Hub-Signature-256 header") + # Compute HMAC SHA256 using your App Secret expected_signature = hmac.new( key=self._whatsapp_secret.encode("utf-8"), @@ -164,8 +173,6 @@ class WhatsAppClient: ).hexdigest() # Extract signature from header (strip 'sha256=' prefix) - if not sha256_signature: - raise Exception("Missing X-Hub-Signature-256 header") received_signature = sha256_signature.split("sha256=")[-1] # Compare signatures securely @@ -306,7 +313,12 @@ class WhatsAppClient: # Create and initialize WebRTC connection pipecat_connection = SmallWebRTCConnection(self._ice_servers) await pipecat_connection.initialize(sdp=call.session.sdp, type=call.session.sdp_type) - sdp_answer = pipecat_connection.get_answer().get("sdp") + answer = pipecat_connection.get_answer() + if answer is None: + raise RuntimeError("SmallWebRTC connection produced no SDP answer") + sdp_answer = answer.get("sdp") + if sdp_answer is None: + raise RuntimeError("SmallWebRTC SDP answer missing 'sdp' field") sdp_answer = self._filter_sdp_for_whatsapp(sdp_answer) logger.debug(f"SDP answer generated for call {call.id}")