Merge pull request #3011 from thsunkid/feat/add-cached-reasoning-tokens-metrics-to-opentel-spans
This commit is contained in:
@@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
### Added
|
||||
|
||||
- Added `cache_read_input_tokens`, `cache_creation_input_tokens` and
|
||||
`reasoning_tokens` to OTel spans for LLM call
|
||||
|
||||
- Added `LiveKitRESTHelper` utility class for managing LiveKit rooms via REST API.
|
||||
|
||||
- Added `DeepgramSageMakerSTTService` which connects to a SageMaker hosted
|
||||
|
||||
@@ -1723,6 +1723,8 @@ class GeminiLiveLLMService(LLMService):
|
||||
prompt_tokens=prompt_tokens,
|
||||
completion_tokens=completion_tokens,
|
||||
total_tokens=total_tokens,
|
||||
cache_read_input_tokens=usage.cached_content_token_count,
|
||||
reasoning_tokens=usage.thoughts_token_count,
|
||||
)
|
||||
|
||||
await self.start_llm_usage_metrics(tokens)
|
||||
|
||||
@@ -123,6 +123,8 @@ class GrokLLMService(OpenAILLMService):
|
||||
self._prompt_tokens = 0
|
||||
self._completion_tokens = 0
|
||||
self._total_tokens = 0
|
||||
self._cache_read_input_tokens = None
|
||||
self._reasoning_tokens = None
|
||||
self._has_reported_prompt_tokens = False
|
||||
self._is_processing = True
|
||||
|
||||
@@ -137,6 +139,8 @@ class GrokLLMService(OpenAILLMService):
|
||||
prompt_tokens=self._prompt_tokens,
|
||||
completion_tokens=self._completion_tokens,
|
||||
total_tokens=self._total_tokens,
|
||||
cache_read_input_tokens=self._cache_read_input_tokens,
|
||||
reasoning_tokens=self._reasoning_tokens,
|
||||
)
|
||||
await super().start_llm_usage_metrics(tokens)
|
||||
|
||||
@@ -149,7 +153,7 @@ class GrokLLMService(OpenAILLMService):
|
||||
|
||||
Args:
|
||||
tokens: The token usage metrics for the current chunk of processing,
|
||||
containing prompt_tokens and completion_tokens counts.
|
||||
containing prompt_tokens, completion_tokens, and optional cached/reasoning tokens.
|
||||
"""
|
||||
# Only accumulate metrics during active processing
|
||||
if not self._is_processing:
|
||||
@@ -164,6 +168,13 @@ class GrokLLMService(OpenAILLMService):
|
||||
if tokens.completion_tokens > self._completion_tokens:
|
||||
self._completion_tokens = tokens.completion_tokens
|
||||
|
||||
# Capture cached & reasoning tokens (these typically only appear once per request)
|
||||
if tokens.cache_read_input_tokens is not None:
|
||||
self._cache_read_input_tokens = tokens.cache_read_input_tokens
|
||||
|
||||
if tokens.reasoning_tokens is not None:
|
||||
self._reasoning_tokens = tokens.reasoning_tokens
|
||||
|
||||
def create_context_aggregator(
|
||||
self,
|
||||
context: OpenAILLMContext,
|
||||
|
||||
@@ -346,11 +346,17 @@ class BaseOpenAILLMService(LLMService):
|
||||
if chunk.usage.prompt_tokens_details
|
||||
else None
|
||||
)
|
||||
reasoning_tokens = (
|
||||
chunk.usage.completion_tokens_details.reasoning_tokens
|
||||
if chunk.usage.completion_tokens_details
|
||||
else None
|
||||
)
|
||||
tokens = LLMTokenUsage(
|
||||
prompt_tokens=chunk.usage.prompt_tokens,
|
||||
completion_tokens=chunk.usage.completion_tokens,
|
||||
total_tokens=chunk.usage.total_tokens,
|
||||
cache_read_input_tokens=cached_tokens,
|
||||
reasoning_tokens=reasoning_tokens,
|
||||
)
|
||||
await self.start_llm_usage_metrics(tokens)
|
||||
|
||||
|
||||
@@ -57,7 +57,6 @@ from pipecat.processors.aggregators.openai_llm_context import (
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.services.llm_service import FunctionCallFromLLM, LLMService
|
||||
from pipecat.services.openai.llm import OpenAIContextAggregatorPair
|
||||
from pipecat.transcriptions.language import Language
|
||||
from pipecat.utils.time import time_now_iso8601
|
||||
from pipecat.utils.tracing.service_decorators import traced_openai_realtime, traced_stt
|
||||
@@ -657,10 +656,17 @@ class OpenAIRealtimeLLMService(LLMService):
|
||||
async def _handle_evt_response_done(self, evt):
|
||||
# todo: figure out whether there's anything we need to do for "cancelled" events
|
||||
# usage metrics
|
||||
cached_tokens = (
|
||||
evt.response.usage.input_token_details.cached_tokens
|
||||
if hasattr(evt.response.usage, "input_token_details")
|
||||
and evt.response.usage.input_token_details
|
||||
else None
|
||||
)
|
||||
tokens = LLMTokenUsage(
|
||||
prompt_tokens=evt.response.usage.input_tokens,
|
||||
completion_tokens=evt.response.usage.output_tokens,
|
||||
total_tokens=evt.response.usage.total_tokens,
|
||||
cache_read_input_tokens=cached_tokens,
|
||||
)
|
||||
await self.start_llm_usage_metrics(tokens)
|
||||
await self.stop_processing_metrics()
|
||||
@@ -810,7 +816,7 @@ class OpenAIRealtimeLLMService(LLMService):
|
||||
# We're done configuring the LLM for this session
|
||||
self._llm_needs_conversation_setup = False
|
||||
|
||||
logger.debug(f"Creating response")
|
||||
logger.debug("Creating response")
|
||||
|
||||
await self.push_frame(LLMFullResponseStartFrame())
|
||||
await self.start_processing_metrics()
|
||||
|
||||
@@ -92,6 +92,24 @@ def _add_token_usage_to_span(span, token_usage):
|
||||
span.set_attribute("gen_ai.usage.input_tokens", token_usage["prompt_tokens"])
|
||||
if "completion_tokens" in token_usage:
|
||||
span.set_attribute("gen_ai.usage.output_tokens", token_usage["completion_tokens"])
|
||||
# Add cached token metrics for dictionary
|
||||
if (
|
||||
"cache_read_input_tokens" in token_usage
|
||||
and token_usage["cache_read_input_tokens"] is not None
|
||||
):
|
||||
span.set_attribute(
|
||||
"gen_ai.usage.cache_read_input_tokens", token_usage["cache_read_input_tokens"]
|
||||
)
|
||||
if (
|
||||
"cache_creation_input_tokens" in token_usage
|
||||
and token_usage["cache_creation_input_tokens"] is not None
|
||||
):
|
||||
span.set_attribute(
|
||||
"gen_ai.usage.cache_creation_input_tokens",
|
||||
token_usage["cache_creation_input_tokens"],
|
||||
)
|
||||
if "reasoning_tokens" in token_usage and token_usage["reasoning_tokens"] is not None:
|
||||
span.set_attribute("gen_ai.usage.reasoning_tokens", token_usage["reasoning_tokens"])
|
||||
else:
|
||||
# Handle LLMTokenUsage object
|
||||
span.set_attribute("gen_ai.usage.input_tokens", getattr(token_usage, "prompt_tokens", 0))
|
||||
@@ -99,6 +117,19 @@ def _add_token_usage_to_span(span, token_usage):
|
||||
"gen_ai.usage.output_tokens", getattr(token_usage, "completion_tokens", 0)
|
||||
)
|
||||
|
||||
# Add cached token metrics for LLMTokenUsage object
|
||||
cache_read_tokens = getattr(token_usage, "cache_read_input_tokens", None)
|
||||
if cache_read_tokens is not None:
|
||||
span.set_attribute("gen_ai.usage.cache_read_input_tokens", cache_read_tokens)
|
||||
|
||||
cache_creation_tokens = getattr(token_usage, "cache_creation_input_tokens", None)
|
||||
if cache_creation_tokens is not None:
|
||||
span.set_attribute("gen_ai.usage.cache_creation_input_tokens", cache_creation_tokens)
|
||||
|
||||
reasoning_tokens = getattr(token_usage, "reasoning_tokens", None)
|
||||
if reasoning_tokens is not None:
|
||||
span.set_attribute("gen_ai.usage.reasoning_tokens", reasoning_tokens)
|
||||
|
||||
|
||||
def traced_tts(func: Optional[Callable] = None, *, name: Optional[str] = None) -> Callable:
|
||||
"""Trace TTS service methods with TTS-specific attributes.
|
||||
@@ -715,7 +746,7 @@ def traced_gemini_live(operation: str) -> Callable:
|
||||
else:
|
||||
operation_attrs["tool.result_status"] = "completed"
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
except json.JSONDecodeError:
|
||||
operation_attrs["tool.result"] = (
|
||||
f"Invalid JSON: {str(result_content)[:500]}"
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user