fix: clean up TypedDict / Optional patterns in 6 more LLM adapters

Same approach as the previous round — apply boundary casts where the
code does dict-style mutation on TypedDict-typed values, narrow at
return sites, and document the LLMSpecificMessage limitation in
realtime adapters that pack history into a single text message.

aws_nova_sonic_adapter.py — pure typing + small narrowing fixes:
- Filter LLMSpecific items in `_from_universal_context_messages`
  (documented).
- `_from_universal_context_message` now declared
  `-> AWSNovaSonicConversationHistoryMessage | None` (it already had
  paths returning None implicitly).
- `get_messages_for_logging` returns `dict[str, Any]` per element
  via `dataclasses.asdict`, matching the declared return type.
- Use a local `role` variable so pyright keeps the narrowing across
  the truthy-content guard.

grok_realtime_adapter.py / inworld_realtime_adapter.py — same shape
of fix as `open_ai_realtime_adapter.py` from the previous batch.
The two files are essentially copies of the OpenAI Realtime adapter,
so the same template applies: cast at the boundary, filter
LLMSpecificMessage with a documented note, replace the implicit-None
fallthrough with `raise ValueError`, and switch the `text_content +=`
pattern (which fails when one of the parts is None) to a
`text_parts.append(...)` + `" ".join(...)` pattern.

open_ai_adapter.py — pure typing. Cast at the
`OpenAILLMInvocationParams` return, narrow the system-instruction
warning's `initial_content` to `str | None`, and cast the custom-tools
list to `list[ChatCompletionToolParam]`.

open_ai_responses_adapter.py — pure typing. Same shape: narrow
`first_content` to `str | None` for the warning resolver, cast the
constructed dict literals at append sites where the target is
`ResponseInputItemParam`, and cast `get_messages_for_logging`'s
return to the declared `list[dict[str, Any]]`.

processors/aggregators/llm_context.py — pure typing. Cast the
deepcopied message in the redaction loop in `get_messages` to
`dict[str, Any]` and the create_image/audio_message return-dict
literals to `LLMContextMessage`.

Removes 6 newly-clean files from the pyright ignore list.

Net: -77 pyright errors (full-config: 680 -> 603).
This commit is contained in:
Paul Kompfner
2026-04-28 17:35:22 -04:00
parent 5e24027fd5
commit 96756bc1f6
7 changed files with 150 additions and 79 deletions

View File

@@ -6,11 +6,6 @@
"exclude": ["**/*_pb2.py", "**/__pycache__"],
"ignore": [
"tests",
"src/pipecat/adapters/services/aws_nova_sonic_adapter.py",
"src/pipecat/adapters/services/grok_realtime_adapter.py",
"src/pipecat/adapters/services/inworld_realtime_adapter.py",
"src/pipecat/adapters/services/open_ai_adapter.py",
"src/pipecat/adapters/services/open_ai_responses_adapter.py",
"src/pipecat/audio/dtmf/utils.py",
"src/pipecat/audio/filters/aic_filter.py",
"src/pipecat/audio/filters/krisp_viva_filter.py",
@@ -18,7 +13,6 @@
"src/pipecat/audio/turn/smart_turn/local_smart_turn_v2.py",
"src/pipecat/audio/turn/smart_turn/local_smart_turn_v3.py",
"src/pipecat/audio/vad/silero.py",
"src/pipecat/processors/aggregators/llm_context.py",
"src/pipecat/processors/aggregators/llm_response_universal.py",
"src/pipecat/processors/frame_processor.py",
"src/pipecat/processors/frameworks/langchain.py",

View File

@@ -8,9 +8,9 @@
import copy
import json
from dataclasses import dataclass
from dataclasses import asdict, dataclass
from enum import Enum
from typing import Any, TypedDict
from typing import Any, TypedDict, cast
from loguru import logger
@@ -110,7 +110,10 @@ class AWSNovaSonicLLMAdapter(BaseLLMAdapter[AWSNovaSonicLLMInvocationParams]):
Returns:
List of messages in a format ready for logging about AWS Nova Sonic.
"""
return self._from_universal_context_messages(self.get_messages(context)).messages
return [
asdict(m)
for m in self._from_universal_context_messages(self.get_messages(context)).messages
]
@dataclass
class ConvertedMessages:
@@ -123,18 +126,27 @@ class AWSNovaSonicLLMAdapter(BaseLLMAdapter[AWSNovaSonicLLMInvocationParams]):
self, universal_context_messages: list[LLMContextMessage]
) -> ConvertedMessages:
system_instruction = None
messages = []
messages: list[AWSNovaSonicConversationHistoryMessage] = []
# Bail if there are no messages
if not universal_context_messages:
return self.ConvertedMessages()
return self.ConvertedMessages(messages=[])
universal_context_messages = copy.deepcopy(universal_context_messages)
# NOTE: This adapter does not yet handle ``LLMSpecificMessage`` —
# those are filtered out below (the role-extraction and conversion
# logic only applies to standard message dicts). If/when this
# adapter grows a per-provider passthrough like the Anthropic
# adapter has, LLMSpecific items can flow through.
ucm: list[dict[str, Any]] = [
cast(dict[str, Any], m)
for m in copy.deepcopy(universal_context_messages)
if isinstance(m, dict)
]
# If we have a "system" message as our first message,
# pull that out into "instruction"
if universal_context_messages[0].get("role") == "system":
system = universal_context_messages.pop(0)
if ucm and ucm[0].get("role") == "system":
system = ucm.pop(0)
content = system.get("content")
if isinstance(content, str):
system_instruction = content
@@ -145,19 +157,21 @@ class AWSNovaSonicLLMAdapter(BaseLLMAdapter[AWSNovaSonicLLMInvocationParams]):
# Convert any remaining "system"/"developer" messages to "user",
# as Nova Sonic only supports "user" and "assistant" in history.
for msg in universal_context_messages:
for msg in ucm:
if msg.get("role") in ("system", "developer"):
msg["role"] = "user"
# Process remaining messages to fill out conversation history.
for universal_context_message in universal_context_messages:
for universal_context_message in ucm:
message = self._from_universal_context_message(universal_context_message)
if message:
messages.append(message)
return self.ConvertedMessages(messages=messages, system_instruction=system_instruction)
def _from_universal_context_message(self, message) -> AWSNovaSonicConversationHistoryMessage:
def _from_universal_context_message(
self, message: dict[str, Any]
) -> AWSNovaSonicConversationHistoryMessage | None:
"""Convert standard message format to Nova Sonic format.
Args:
@@ -167,17 +181,18 @@ class AWSNovaSonicLLMAdapter(BaseLLMAdapter[AWSNovaSonicLLMInvocationParams]):
Nova Sonic conversation history message, or None if not convertible.
"""
role = message.get("role")
if message.get("role") == "user" or message.get("role") == "assistant":
if role == "user" or role == "assistant":
content = message.get("content")
if isinstance(message.get("content"), list):
content = ""
for c in message.get("content"):
if isinstance(content, list):
text_parts = []
for c in content:
if c.get("type") == "text":
content += " " + c.get("text")
text_parts.append(c.get("text"))
else:
logger.error(
f"Unhandled content type in context message: {c.get('type')} - {message}"
)
content = " ".join(t for t in text_parts if t)
# There won't be content if this is an assistant tool call entry.
# We're ignoring those since they can't be loaded into AWS Nova Sonic conversation
# history

View File

@@ -13,7 +13,7 @@ Grok's Voice Agent API.
import copy
import json
from dataclasses import dataclass
from typing import Any, TypedDict
from typing import Any, TypedDict, cast
from loguru import logger
@@ -85,7 +85,10 @@ class GrokRealtimeLLMAdapter(BaseLLMAdapter):
Returns:
List of messages with sensitive data redacted.
"""
return self.get_messages(context, truncate_large_values=True)
return cast(
list[dict[str, Any]],
self.get_messages(context, truncate_large_values=True),
)
@dataclass
class ConvertedMessages:
@@ -111,11 +114,20 @@ class GrokRealtimeLLMAdapter(BaseLLMAdapter):
if not universal_context_messages:
return self.ConvertedMessages(messages=[])
messages = copy.deepcopy(universal_context_messages)
# NOTE: This adapter does not yet handle ``LLMSpecificMessage`` —
# those are filtered out below. Other adapters (e.g. Anthropic)
# dispatch LLMSpecific items through a per-provider passthrough.
# The pack-into-single-text-message strategy here doesn't compose
# with opaque per-provider payloads.
messages: list[dict[str, Any]] = [
cast(dict[str, Any], m)
for m in copy.deepcopy(universal_context_messages)
if isinstance(m, dict)
]
system_instruction = None
# Extract system message as session instructions
if messages[0].get("role") == "system":
if messages and messages[0].get("role") == "system":
system = messages.pop(0)
content = system.get("content")
if isinstance(content, str):
@@ -133,7 +145,9 @@ class GrokRealtimeLLMAdapter(BaseLLMAdapter):
# Single user message can be sent normally
if len(messages) == 1 and messages[0].get("role") == "user":
return self.ConvertedMessages(
messages=[self._from_universal_context_message(messages[0])],
messages=[
self._from_universal_context_message(cast(LLMContextMessage, messages[0]))
],
system_instruction=system_instruction,
)
@@ -181,26 +195,29 @@ class GrokRealtimeLLMAdapter(BaseLLMAdapter):
Returns:
ConversationItem formatted for Grok Realtime API.
"""
if message.get("role") == "user":
content = message.get("content")
# NOTE: ``LLMSpecificMessage`` is not yet handled here — see the
# corresponding note in `_from_universal_context_messages`.
msg = cast(dict[str, Any], message)
if msg.get("role") == "user":
content = msg.get("content")
if isinstance(content, list):
text_content = ""
text_parts = []
for c in content:
if c.get("type") == "text":
text_content += " " + c.get("text")
text_parts.append(c.get("text"))
else:
logger.error(
f"Unhandled content type in context message: {c.get('type')} - {message}"
f"Unhandled content type in context message: {c.get('type')} - {msg}"
)
content = text_content.strip()
content = " ".join(t for t in text_parts if t).strip()
return events.ConversationItem(
role="user",
type="message",
content=[events.ItemContent(type="input_text", text=content)],
)
if message.get("role") == "assistant" and message.get("tool_calls"):
tc = message.get("tool_calls")[0]
if msg.get("role") == "assistant" and msg.get("tool_calls"):
tc = msg["tool_calls"][0]
return events.ConversationItem(
type="function_call",
call_id=tc["id"],
@@ -208,7 +225,7 @@ class GrokRealtimeLLMAdapter(BaseLLMAdapter):
arguments=tc["function"]["arguments"],
)
logger.error(f"Unhandled message type in _from_universal_context_message: {message}")
raise ValueError(f"Unhandled message type in _from_universal_context_message: {msg}")
@staticmethod
def _to_grok_function_format(function: FunctionSchema) -> dict[str, Any]:

View File

@@ -13,7 +13,7 @@ Inworld's Realtime API.
import copy
import json
from dataclasses import dataclass
from typing import Any, TypedDict
from typing import Any, TypedDict, cast
from loguru import logger
@@ -85,7 +85,10 @@ class InworldRealtimeLLMAdapter(BaseLLMAdapter):
Returns:
List of messages with sensitive data redacted.
"""
return self.get_messages(context, truncate_large_values=True)
return cast(
list[dict[str, Any]],
self.get_messages(context, truncate_large_values=True),
)
@dataclass
class ConvertedMessages:
@@ -111,11 +114,20 @@ class InworldRealtimeLLMAdapter(BaseLLMAdapter):
if not universal_context_messages:
return self.ConvertedMessages(messages=[])
messages = copy.deepcopy(universal_context_messages)
# NOTE: This adapter does not yet handle ``LLMSpecificMessage`` —
# those are filtered out below. Other adapters (e.g. Anthropic)
# dispatch LLMSpecific items through a per-provider passthrough.
# The pack-into-single-text-message strategy here doesn't compose
# with opaque per-provider payloads.
messages: list[dict[str, Any]] = [
cast(dict[str, Any], m)
for m in copy.deepcopy(universal_context_messages)
if isinstance(m, dict)
]
system_instruction = None
# Extract system message as session instructions
if messages[0].get("role") == "system":
if messages and messages[0].get("role") == "system":
system = messages.pop(0)
content = system.get("content")
if isinstance(content, str):
@@ -133,7 +145,9 @@ class InworldRealtimeLLMAdapter(BaseLLMAdapter):
# Single user message can be sent normally
if len(messages) == 1 and messages[0].get("role") == "user":
return self.ConvertedMessages(
messages=[self._from_universal_context_message(messages[0])],
messages=[
self._from_universal_context_message(cast(LLMContextMessage, messages[0]))
],
system_instruction=system_instruction,
)
@@ -181,26 +195,29 @@ class InworldRealtimeLLMAdapter(BaseLLMAdapter):
Returns:
ConversationItem formatted for Inworld Realtime API.
"""
if message.get("role") == "user":
content = message.get("content")
# NOTE: ``LLMSpecificMessage`` is not yet handled here — see the
# corresponding note in `_from_universal_context_messages`.
msg = cast(dict[str, Any], message)
if msg.get("role") == "user":
content = msg.get("content")
if isinstance(content, list):
text_content = ""
text_parts = []
for c in content:
if c.get("type") == "text":
text_content += " " + c.get("text")
text_parts.append(c.get("text"))
else:
logger.error(
f"Unhandled content type in context message: {c.get('type')} - {message}"
f"Unhandled content type in context message: {c.get('type')} - {msg}"
)
content = text_content.strip()
content = " ".join(t for t in text_parts if t).strip()
return events.ConversationItem(
role="user",
type="message",
content=[events.ItemContent(type="input_text", text=content)],
)
if message.get("role") == "assistant" and message.get("tool_calls"):
tc = message.get("tool_calls")[0]
if msg.get("role") == "assistant" and msg.get("tool_calls"):
tc = msg["tool_calls"][0]
return events.ConversationItem(
type="function_call",
call_id=tc["id"],
@@ -208,7 +225,7 @@ class InworldRealtimeLLMAdapter(BaseLLMAdapter):
arguments=tc["function"]["arguments"],
)
logger.error(f"Unhandled message type in _from_universal_context_message: {message}")
raise ValueError(f"Unhandled message type in _from_universal_context_message: {msg}")
@staticmethod
def _to_inworld_function_format(function: FunctionSchema) -> dict[str, Any]:

View File

@@ -127,12 +127,15 @@ class OpenAILLMAdapter(BaseLLMAdapter[OpenAILLMInvocationParams]):
)
if system_instruction:
# Detect initial system message for warning purposes (don't extract)
initial_content = (
messages[0].get("content", "")
if messages and messages[0].get("role") == "system"
else None
)
# Detect initial system message for warning purposes (don't extract).
# ChatCompletionMessageParam.content is `str | Iterable[...]`; we
# only forward it for warning purposes, so coerce non-strings to
# None — the resolver handles None.
initial_content: str | None = None
if messages and messages[0].get("role") == "system":
raw_content = messages[0].get("content", "")
if isinstance(raw_content, str):
initial_content = raw_content
self._resolve_system_instruction(
initial_content,
system_instruction,
@@ -140,12 +143,15 @@ class OpenAILLMAdapter(BaseLLMAdapter[OpenAILLMInvocationParams]):
)
messages = [{"role": "system", "content": system_instruction}] + messages
return {
"messages": messages,
# NOTE; LLMContext's tools are guaranteed to be a ToolsSchema (or NOT_GIVEN)
"tools": self.from_standard_tools(context.tools),
"tool_choice": _openai_from_llm_context_tool_choice(context.tool_choice),
}
return cast(
OpenAILLMInvocationParams,
{
"messages": messages,
# NOTE; LLMContext's tools are guaranteed to be a ToolsSchema (or NOT_GIVEN)
"tools": self.from_standard_tools(context.tools),
"tool_choice": _openai_from_llm_context_tool_choice(context.tool_choice),
},
)
def to_provider_tools_format(self, tools_schema: ToolsSchema) -> list[ChatCompletionToolParam]:
"""Convert function schemas to OpenAI's function-calling format.
@@ -158,13 +164,19 @@ class OpenAILLMAdapter(BaseLLMAdapter[OpenAILLMInvocationParams]):
with ChatCompletion API.
"""
functions_schema = tools_schema.standard_tools
formatted_standard_tools = [
ChatCompletionToolParam(type="function", function=func.to_default_dict())
# `function=...` expects a `FunctionDefinition` TypedDict; the dict
# produced by `to_default_dict()` is structurally compatible. Cast at
# the boundary.
formatted_standard_tools: list[ChatCompletionToolParam] = [
ChatCompletionToolParam(type="function", function=cast(Any, func.to_default_dict()))
for func in functions_schema
]
custom_openai_tools = []
custom_openai_tools: list[ChatCompletionToolParam] = []
if tools_schema.custom_tools:
custom_openai_tools = tools_schema.custom_tools.get(AdapterType.OPENAI, [])
custom_openai_tools = cast(
list[ChatCompletionToolParam],
tools_schema.custom_tools.get(AdapterType.OPENAI, []),
)
return formatted_standard_tools + custom_openai_tools
def get_messages_for_logging(self, context: LLMContext) -> list[dict[str, Any]]:
@@ -178,7 +190,10 @@ class OpenAILLMAdapter(BaseLLMAdapter[OpenAILLMInvocationParams]):
Returns:
List of messages in a format ready for logging about OpenAI.
"""
return self.get_messages(context, truncate_large_values=True)
return cast(
list[dict[str, Any]],
self.get_messages(context, truncate_large_values=True),
)
def _from_universal_context_messages(
self,

View File

@@ -6,7 +6,7 @@
"""OpenAI Responses API adapter for Pipecat."""
from typing import Any, TypedDict
from typing import Any, TypedDict, cast
from openai._types import NotGiven as OpenAINotGiven
from openai.types.responses import FunctionToolParam, ResponseInputItemParam, ToolParam
@@ -64,8 +64,11 @@ class OpenAIResponsesLLMAdapter(BaseLLMAdapter[OpenAIResponsesLLMInvocationParam
if system_instruction and messages:
first_msg = messages[0] if not isinstance(messages[0], LLMSpecificMessage) else None
if first_msg and first_msg.get("role") == "system":
# `content` is `str | Iterable[...]`; we only forward it for
# warning purposes. Coerce non-strings to None.
first_content = first_msg.get("content", "")
self._resolve_system_instruction(
first_msg.get("content", ""),
first_content if isinstance(first_content, str) else None,
system_instruction,
discard_context_system=False,
)
@@ -143,7 +146,10 @@ class OpenAIResponsesLLMAdapter(BaseLLMAdapter[OpenAIResponsesLLMInvocationParam
Returns:
List of messages in a format ready for logging.
"""
return self.get_messages(context, truncate_large_values=True)
return cast(
list[dict[str, Any]],
self.get_messages(context, truncate_large_values=True),
)
def _convert_messages_to_input(
self, messages: list[LLMContextMessage]
@@ -169,13 +175,15 @@ class OpenAIResponsesLLMAdapter(BaseLLMAdapter[OpenAIResponsesLLMInvocationParam
content = message.get("content", "")
if isinstance(content, list):
content = self._convert_multimodal_content(content)
result.append({"role": "developer", "content": content})
result.append(
cast(ResponseInputItemParam, {"role": "developer", "content": content})
)
elif role == "user":
content = message.get("content", "")
if isinstance(content, list):
content = self._convert_multimodal_content(content)
result.append({"role": "user", "content": content})
result.append(cast(ResponseInputItemParam, {"role": "user", "content": content}))
elif role == "assistant":
tool_calls = message.get("tool_calls")
@@ -194,7 +202,9 @@ class OpenAIResponsesLLMAdapter(BaseLLMAdapter[OpenAIResponsesLLMInvocationParam
content = message.get("content", "")
if isinstance(content, list):
content = self._convert_multimodal_content(content)
result.append({"role": "assistant", "content": content})
result.append(
cast(ResponseInputItemParam, {"role": "assistant", "content": content})
)
elif role == "tool":
content = message.get("content", "")

View File

@@ -21,7 +21,7 @@ import io
import wave
from collections.abc import Callable
from dataclasses import dataclass
from typing import Any, TypeAlias, TypeGuard, TypeVar
from typing import Any, TypeAlias, TypeGuard, TypeVar, cast
from loguru import logger
from openai._types import NOT_GIVEN as OPEN_AI_NOT_GIVEN
@@ -129,13 +129,13 @@ class LLMContext:
url: The URL of the image.
text: Optional text to include with the image.
"""
content = []
content: list[dict[str, Any]] = []
if text:
content.append({"type": "text", "text": text})
content.append({"type": "image_url", "image_url": {"url": url}})
return {"role": role, "content": content}
return cast(LLMContextMessage, {"role": role, "content": content})
@staticmethod
async def create_image_message(
@@ -187,7 +187,7 @@ class LLMContext:
audio_frames: List of audio frame objects to include.
text: Optional text to include with the audio.
"""
content = [{"type": "text", "text": text}]
content: list[dict[str, Any]] = [{"type": "text", "text": text}]
def encode_audio():
sample_rate = audio_frames[0].sample_rate
@@ -214,7 +214,7 @@ class LLMContext:
}
)
return {"role": role, "content": content}
return cast(LLMContextMessage, {"role": role, "content": content})
@property
def messages(self) -> list[LLMContextMessage]:
@@ -295,7 +295,10 @@ class LLMContext:
result.append(msg_copy)
continue
msg = copy.deepcopy(message)
# The standard message variant is a union of TypedDicts; the
# mutations below operate on plain dicts at runtime. Treat as
# such for the duration of the redaction loop.
msg: dict[str, Any] = cast(dict[str, Any], copy.deepcopy(message))
content = msg.get("content")
if isinstance(content, list):
for item in content: