diff --git a/CHANGELOG.md b/CHANGELOG.md index 4b558d0f1..62a1fe80c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Added `cache_read_input_tokens`, `cache_creation_input_tokens` and + `reasoning_tokens` to OTel spans for LLM call + - Added `LiveKitRESTHelper` utility class for managing LiveKit rooms via REST API. - Added `DeepgramSageMakerSTTService` which connects to a SageMaker hosted diff --git a/src/pipecat/services/google/gemini_live/llm.py b/src/pipecat/services/google/gemini_live/llm.py index 2c6e8e463..6fd195c0d 100644 --- a/src/pipecat/services/google/gemini_live/llm.py +++ b/src/pipecat/services/google/gemini_live/llm.py @@ -1723,6 +1723,8 @@ class GeminiLiveLLMService(LLMService): prompt_tokens=prompt_tokens, completion_tokens=completion_tokens, total_tokens=total_tokens, + cache_read_input_tokens=usage.cached_content_token_count, + reasoning_tokens=usage.thoughts_token_count, ) await self.start_llm_usage_metrics(tokens) diff --git a/src/pipecat/services/grok/llm.py b/src/pipecat/services/grok/llm.py index 684be2d39..625286a60 100644 --- a/src/pipecat/services/grok/llm.py +++ b/src/pipecat/services/grok/llm.py @@ -123,6 +123,8 @@ class GrokLLMService(OpenAILLMService): self._prompt_tokens = 0 self._completion_tokens = 0 self._total_tokens = 0 + self._cache_read_input_tokens = None + self._reasoning_tokens = None self._has_reported_prompt_tokens = False self._is_processing = True @@ -137,6 +139,8 @@ class GrokLLMService(OpenAILLMService): prompt_tokens=self._prompt_tokens, completion_tokens=self._completion_tokens, total_tokens=self._total_tokens, + cache_read_input_tokens=self._cache_read_input_tokens, + reasoning_tokens=self._reasoning_tokens, ) await super().start_llm_usage_metrics(tokens) @@ -149,7 +153,7 @@ class GrokLLMService(OpenAILLMService): Args: tokens: The token usage metrics for the current chunk of processing, - containing prompt_tokens and completion_tokens counts. + containing prompt_tokens, completion_tokens, and optional cached/reasoning tokens. """ # Only accumulate metrics during active processing if not self._is_processing: @@ -164,6 +168,13 @@ class GrokLLMService(OpenAILLMService): if tokens.completion_tokens > self._completion_tokens: self._completion_tokens = tokens.completion_tokens + # Capture cached & reasoning tokens (these typically only appear once per request) + if tokens.cache_read_input_tokens is not None: + self._cache_read_input_tokens = tokens.cache_read_input_tokens + + if tokens.reasoning_tokens is not None: + self._reasoning_tokens = tokens.reasoning_tokens + def create_context_aggregator( self, context: OpenAILLMContext, diff --git a/src/pipecat/services/openai/base_llm.py b/src/pipecat/services/openai/base_llm.py index d020e1106..86ed6bdbc 100644 --- a/src/pipecat/services/openai/base_llm.py +++ b/src/pipecat/services/openai/base_llm.py @@ -346,11 +346,17 @@ class BaseOpenAILLMService(LLMService): if chunk.usage.prompt_tokens_details else None ) + reasoning_tokens = ( + chunk.usage.completion_tokens_details.reasoning_tokens + if chunk.usage.completion_tokens_details + else None + ) tokens = LLMTokenUsage( prompt_tokens=chunk.usage.prompt_tokens, completion_tokens=chunk.usage.completion_tokens, total_tokens=chunk.usage.total_tokens, cache_read_input_tokens=cached_tokens, + reasoning_tokens=reasoning_tokens, ) await self.start_llm_usage_metrics(tokens) diff --git a/src/pipecat/services/openai/realtime/llm.py b/src/pipecat/services/openai/realtime/llm.py index 755e64040..bea9986f3 100644 --- a/src/pipecat/services/openai/realtime/llm.py +++ b/src/pipecat/services/openai/realtime/llm.py @@ -57,7 +57,6 @@ from pipecat.processors.aggregators.openai_llm_context import ( ) from pipecat.processors.frame_processor import FrameDirection from pipecat.services.llm_service import FunctionCallFromLLM, LLMService -from pipecat.services.openai.llm import OpenAIContextAggregatorPair from pipecat.transcriptions.language import Language from pipecat.utils.time import time_now_iso8601 from pipecat.utils.tracing.service_decorators import traced_openai_realtime, traced_stt @@ -657,10 +656,17 @@ class OpenAIRealtimeLLMService(LLMService): async def _handle_evt_response_done(self, evt): # todo: figure out whether there's anything we need to do for "cancelled" events # usage metrics + cached_tokens = ( + evt.response.usage.input_token_details.cached_tokens + if hasattr(evt.response.usage, "input_token_details") + and evt.response.usage.input_token_details + else None + ) tokens = LLMTokenUsage( prompt_tokens=evt.response.usage.input_tokens, completion_tokens=evt.response.usage.output_tokens, total_tokens=evt.response.usage.total_tokens, + cache_read_input_tokens=cached_tokens, ) await self.start_llm_usage_metrics(tokens) await self.stop_processing_metrics() @@ -810,7 +816,7 @@ class OpenAIRealtimeLLMService(LLMService): # We're done configuring the LLM for this session self._llm_needs_conversation_setup = False - logger.debug(f"Creating response") + logger.debug("Creating response") await self.push_frame(LLMFullResponseStartFrame()) await self.start_processing_metrics() diff --git a/src/pipecat/utils/tracing/service_decorators.py b/src/pipecat/utils/tracing/service_decorators.py index ead4aa9e8..7bcb3b63e 100644 --- a/src/pipecat/utils/tracing/service_decorators.py +++ b/src/pipecat/utils/tracing/service_decorators.py @@ -92,6 +92,24 @@ def _add_token_usage_to_span(span, token_usage): span.set_attribute("gen_ai.usage.input_tokens", token_usage["prompt_tokens"]) if "completion_tokens" in token_usage: span.set_attribute("gen_ai.usage.output_tokens", token_usage["completion_tokens"]) + # Add cached token metrics for dictionary + if ( + "cache_read_input_tokens" in token_usage + and token_usage["cache_read_input_tokens"] is not None + ): + span.set_attribute( + "gen_ai.usage.cache_read_input_tokens", token_usage["cache_read_input_tokens"] + ) + if ( + "cache_creation_input_tokens" in token_usage + and token_usage["cache_creation_input_tokens"] is not None + ): + span.set_attribute( + "gen_ai.usage.cache_creation_input_tokens", + token_usage["cache_creation_input_tokens"], + ) + if "reasoning_tokens" in token_usage and token_usage["reasoning_tokens"] is not None: + span.set_attribute("gen_ai.usage.reasoning_tokens", token_usage["reasoning_tokens"]) else: # Handle LLMTokenUsage object span.set_attribute("gen_ai.usage.input_tokens", getattr(token_usage, "prompt_tokens", 0)) @@ -99,6 +117,19 @@ def _add_token_usage_to_span(span, token_usage): "gen_ai.usage.output_tokens", getattr(token_usage, "completion_tokens", 0) ) + # Add cached token metrics for LLMTokenUsage object + cache_read_tokens = getattr(token_usage, "cache_read_input_tokens", None) + if cache_read_tokens is not None: + span.set_attribute("gen_ai.usage.cache_read_input_tokens", cache_read_tokens) + + cache_creation_tokens = getattr(token_usage, "cache_creation_input_tokens", None) + if cache_creation_tokens is not None: + span.set_attribute("gen_ai.usage.cache_creation_input_tokens", cache_creation_tokens) + + reasoning_tokens = getattr(token_usage, "reasoning_tokens", None) + if reasoning_tokens is not None: + span.set_attribute("gen_ai.usage.reasoning_tokens", reasoning_tokens) + def traced_tts(func: Optional[Callable] = None, *, name: Optional[str] = None) -> Callable: """Trace TTS service methods with TTS-specific attributes. @@ -715,7 +746,7 @@ def traced_gemini_live(operation: str) -> Callable: else: operation_attrs["tool.result_status"] = "completed" - except json.JSONDecodeError as e: + except json.JSONDecodeError: operation_attrs["tool.result"] = ( f"Invalid JSON: {str(result_content)[:500]}" )