Merge pull request #1859 from pipecat-ai/mb/otel-attribute-naming

Update OTel attribute names
This commit is contained in:
Mark Backman
2025-05-21 12:10:15 -04:00
committed by GitHub
2 changed files with 70 additions and 31 deletions

View File

@@ -18,6 +18,39 @@ if is_tracing_available():
from opentelemetry.trace import Span
def _get_gen_ai_system_from_service_name(service_name: str) -> str:
"""Extract the standardized gen_ai.system value from a service class name.
Source:
https://opentelemetry.io/docs/specs/semconv/attributes-registry/gen-ai/#gen-ai-system
Uses standard OTel names where possible, with special case mappings for
service names that don't follow the pattern.
"""
SPECIAL_CASE_MAPPINGS = {
# AWS
"AWSBedrockLLMService": "aws.bedrock",
# Azure
"AzureLLMService": "az.ai.openai",
# Google
"GoogleLLMService": "gcp.gemini",
"GoogleLLMOpenAIBetaService": "gcp.gemini",
"GoogleVertexLLMService": "gcp.vertex_ai",
# Others
"GrokLLMService": "xai",
}
if service_name in SPECIAL_CASE_MAPPINGS:
return SPECIAL_CASE_MAPPINGS[service_name]
if service_name.endswith("LLMService"):
provider = service_name[:-10].lower()
else:
provider = service_name.lower()
return provider
def add_tts_span_attributes(
span: "Span",
service_name: str,
@@ -45,17 +78,18 @@ def add_tts_span_attributes(
**kwargs: Additional attributes to add
"""
# Add standard attributes
span.set_attribute("service.name", service_name)
span.set_attribute("model", model)
span.set_attribute("gen_ai.system", service_name.replace("TTSService", "").lower())
span.set_attribute("gen_ai.request.model", model)
span.set_attribute("gen_ai.operation.name", operation_name)
span.set_attribute("gen_ai.output.type", "speech")
span.set_attribute("voice_id", voice_id)
span.set_attribute("operation", operation_name)
# Add optional attributes
if text:
span.set_attribute("text", text)
if character_count is not None:
span.set_attribute("metrics.tts.character_count", character_count)
span.set_attribute("metrics.character_count", character_count)
if ttfb_ms is not None:
span.set_attribute("metrics.ttfb_ms", ttfb_ms)
@@ -76,6 +110,7 @@ def add_stt_span_attributes(
span: "Span",
service_name: str,
model: str,
operation_name: str = "stt",
transcript: Optional[str] = None,
is_final: Optional[bool] = None,
language: Optional[str] = None,
@@ -90,6 +125,7 @@ def add_stt_span_attributes(
span: The span to add attributes to
service_name: Name of the STT service (e.g., "deepgram")
model: Model name/identifier
operation_name: Name of the operation (default: "stt")
transcript: The transcribed text
is_final: Whether this is a final transcript
language: Detected or configured language
@@ -99,8 +135,9 @@ def add_stt_span_attributes(
**kwargs: Additional attributes to add
"""
# Add standard attributes
span.set_attribute("service.name", service_name)
span.set_attribute("model", model)
span.set_attribute("gen_ai.system", service_name.replace("STTService", "").lower())
span.set_attribute("gen_ai.request.model", model)
span.set_attribute("gen_ai.operation.name", operation_name)
span.set_attribute("vad_enabled", vad_enabled)
# Add optional attributes
@@ -161,13 +198,15 @@ def add_llm_span_attributes(
**kwargs: Additional attributes to add
"""
# Add standard attributes
span.set_attribute("service.name", service_name)
span.set_attribute("model", model)
span.set_attribute("gen_ai.system", _get_gen_ai_system_from_service_name(service_name))
span.set_attribute("gen_ai.request.model", model)
span.set_attribute("gen_ai.operation.name", "chat")
span.set_attribute("gen_ai.output.type", "text")
span.set_attribute("stream", stream)
# Add optional attributes
if messages:
span.set_attribute("messages", messages)
span.set_attribute("input", messages)
if tools:
span.set_attribute("tools", tools)
@@ -188,7 +227,19 @@ def add_llm_span_attributes(
if parameters:
for key, value in parameters.items():
if isinstance(value, (str, int, float, bool)):
span.set_attribute(f"param.{key}", value)
if key in [
"temperature",
"max_tokens",
"max_completion_tokens",
"top_p",
"top_k",
"frequency_penalty",
"presence_penalty",
"seed",
]:
span.set_attribute(f"gen_ai.request.{key}", value)
else:
span.set_attribute(f"param.{key}", value)
# Add extra parameters if provided
if extra_parameters:

View File

@@ -67,20 +67,6 @@ def _get_parent_service_context(self):
return context_api.get_current()
def _get_service_name(self, service_prefix: str) -> str:
"""Generate a default span name using service type and class name.
Args:
self: The service instance.
service_prefix: The service type (e.g., 'llm', 'stt', 'tts').
Returns:
A default span name string like "type_classname" (e.g. llm_openaillmservice).
"""
service_class_name = self.__class__.__name__.lower()
return f"{service_prefix}_{service_class_name}"
def _add_token_usage_to_span(span, token_usage):
"""Add token usage metrics to a span (internal use only).
@@ -93,13 +79,15 @@ def _add_token_usage_to_span(span, token_usage):
if isinstance(token_usage, dict):
if "prompt_tokens" in token_usage:
span.set_attribute("llm.prompt_tokens", token_usage["prompt_tokens"])
span.set_attribute("gen_ai.usage.input_tokens", token_usage["prompt_tokens"])
if "completion_tokens" in token_usage:
span.set_attribute("llm.completion_tokens", token_usage["completion_tokens"])
span.set_attribute("gen_ai.usage.output_tokens", token_usage["completion_tokens"])
else:
# Handle LLMTokenUsage object
span.set_attribute("llm.prompt_tokens", getattr(token_usage, "prompt_tokens", 0))
span.set_attribute("llm.completion_tokens", getattr(token_usage, "completion_tokens", 0))
span.set_attribute("gen_ai.usage.input_tokens", getattr(token_usage, "prompt_tokens", 0))
span.set_attribute(
"gen_ai.usage.output_tokens", getattr(token_usage, "completion_tokens", 0)
)
def traced_tts(func: Optional[Callable] = None, *, name: Optional[str] = None) -> Callable:
@@ -134,7 +122,7 @@ def traced_tts(func: Optional[Callable] = None, *, name: Optional[str] = None) -
return
service_class_name = self.__class__.__name__
span_name = name or _get_service_name(self, "tts")
span_name = "tts"
# Get parent context
turn_context = get_current_turn_context()
@@ -237,7 +225,7 @@ def traced_stt(func: Optional[Callable] = None, *, name: Optional[str] = None) -
return await f(self, transcript, is_final, language)
service_class_name = self.__class__.__name__
span_name = name or _get_service_name(self, "stt")
span_name = "stt"
# Get the turn context first, then fall back to service context
turn_context = get_current_turn_context()
@@ -313,7 +301,7 @@ def traced_llm(func: Optional[Callable] = None, *, name: Optional[str] = None) -
return await f(self, context, *args, **kwargs)
service_class_name = self.__class__.__name__
span_name = name or _get_service_name(self, "llm")
span_name = "llm"
# Get the parent context - turn context if available, otherwise service context
turn_context = get_current_turn_context()