Compare commits

...

5 Commits

Author SHA1 Message Date
Filipi Fuchter
5fd9348311 Recording high quality audio. 2025-11-26 10:02:12 -03:00
Mark Backman
19d8b0dfc2 Merge pull request #3011 from thsunkid/feat/add-cached-reasoning-tokens-metrics-to-opentel-spans 2025-11-26 07:45:33 -05:00
Thu Nguyen
36c4bc2df2 Update changelog 2025-11-26 13:01:48 +07:00
Thu Nguyen
42be0183af Merge branch 'main' into feat/add-cached-reasoning-tokens-metrics-to-opentel-spans 2025-11-26 12:59:43 +07:00
Thu Nguyen
35593b8574 Add cached and reasoning token metrics to OpenTelemetry spans 2025-11-09 00:38:30 +07:00
8 changed files with 72 additions and 72 deletions

View File

@@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Added `cache_read_input_tokens`, `cache_creation_input_tokens` and
`reasoning_tokens` to OTel spans for LLM call
- Added `LiveKitRESTHelper` utility class for managing LiveKit rooms via REST API.
- Added `DeepgramSageMakerSTTService` which connects to a SageMaker hosted

View File

@@ -50,25 +50,14 @@ import aiofiles
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
@@ -94,20 +83,10 @@ transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
}
@@ -115,38 +94,13 @@ transport_params = {
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"), audio_passthrough=True)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121",
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4")
# Create audio buffer processor
audiobuffer = AudioBufferProcessor()
messages = [
{
"role": "system",
"content": "You are a helpful assistant demonstrating audio recording capabilities. Keep your responses brief and clear.",
},
]
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(context)
audiobuffer = AudioBufferProcessor(sample_rate=48000)
pipeline = Pipeline(
[
transport.input(),
stt,
context_aggregator.user(),
llm,
tts,
transport.output(),
audiobuffer, # Add audio buffer to pipeline
context_aggregator.assistant(),
]
)
@@ -155,6 +109,8 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
audio_in_sample_rate=48000,
audio_out_sample_rate= 48000
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@@ -165,7 +121,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
# Start recording audio
await audiobuffer.start_recording()
# Start conversation - empty prompt to let LLM follow system instructions
await task.queue_frames([LLMRunFrame()])
# await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):

View File

@@ -1723,6 +1723,8 @@ class GeminiLiveLLMService(LLMService):
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
total_tokens=total_tokens,
cache_read_input_tokens=usage.cached_content_token_count,
reasoning_tokens=usage.thoughts_token_count,
)
await self.start_llm_usage_metrics(tokens)

View File

@@ -123,6 +123,8 @@ class GrokLLMService(OpenAILLMService):
self._prompt_tokens = 0
self._completion_tokens = 0
self._total_tokens = 0
self._cache_read_input_tokens = None
self._reasoning_tokens = None
self._has_reported_prompt_tokens = False
self._is_processing = True
@@ -137,6 +139,8 @@ class GrokLLMService(OpenAILLMService):
prompt_tokens=self._prompt_tokens,
completion_tokens=self._completion_tokens,
total_tokens=self._total_tokens,
cache_read_input_tokens=self._cache_read_input_tokens,
reasoning_tokens=self._reasoning_tokens,
)
await super().start_llm_usage_metrics(tokens)
@@ -149,7 +153,7 @@ class GrokLLMService(OpenAILLMService):
Args:
tokens: The token usage metrics for the current chunk of processing,
containing prompt_tokens and completion_tokens counts.
containing prompt_tokens, completion_tokens, and optional cached/reasoning tokens.
"""
# Only accumulate metrics during active processing
if not self._is_processing:
@@ -164,6 +168,13 @@ class GrokLLMService(OpenAILLMService):
if tokens.completion_tokens > self._completion_tokens:
self._completion_tokens = tokens.completion_tokens
# Capture cached & reasoning tokens (these typically only appear once per request)
if tokens.cache_read_input_tokens is not None:
self._cache_read_input_tokens = tokens.cache_read_input_tokens
if tokens.reasoning_tokens is not None:
self._reasoning_tokens = tokens.reasoning_tokens
def create_context_aggregator(
self,
context: OpenAILLMContext,

View File

@@ -346,11 +346,17 @@ class BaseOpenAILLMService(LLMService):
if chunk.usage.prompt_tokens_details
else None
)
reasoning_tokens = (
chunk.usage.completion_tokens_details.reasoning_tokens
if chunk.usage.completion_tokens_details
else None
)
tokens = LLMTokenUsage(
prompt_tokens=chunk.usage.prompt_tokens,
completion_tokens=chunk.usage.completion_tokens,
total_tokens=chunk.usage.total_tokens,
cache_read_input_tokens=cached_tokens,
reasoning_tokens=reasoning_tokens,
)
await self.start_llm_usage_metrics(tokens)

View File

@@ -57,7 +57,6 @@ from pipecat.processors.aggregators.openai_llm_context import (
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.llm_service import FunctionCallFromLLM, LLMService
from pipecat.services.openai.llm import OpenAIContextAggregatorPair
from pipecat.transcriptions.language import Language
from pipecat.utils.time import time_now_iso8601
from pipecat.utils.tracing.service_decorators import traced_openai_realtime, traced_stt
@@ -657,10 +656,17 @@ class OpenAIRealtimeLLMService(LLMService):
async def _handle_evt_response_done(self, evt):
# todo: figure out whether there's anything we need to do for "cancelled" events
# usage metrics
cached_tokens = (
evt.response.usage.input_token_details.cached_tokens
if hasattr(evt.response.usage, "input_token_details")
and evt.response.usage.input_token_details
else None
)
tokens = LLMTokenUsage(
prompt_tokens=evt.response.usage.input_tokens,
completion_tokens=evt.response.usage.output_tokens,
total_tokens=evt.response.usage.total_tokens,
cache_read_input_tokens=cached_tokens,
)
await self.start_llm_usage_metrics(tokens)
await self.stop_processing_metrics()
@@ -810,7 +816,7 @@ class OpenAIRealtimeLLMService(LLMService):
# We're done configuring the LLM for this session
self._llm_needs_conversation_setup = False
logger.debug(f"Creating response")
logger.debug("Creating response")
await self.push_frame(LLMFullResponseStartFrame())
await self.start_processing_metrics()

View File

@@ -235,7 +235,7 @@ class SmallWebRTCClient:
# We are always resampling it for 16000 if the sample_rate that we receive is bigger than that.
# otherwise we face issues with Silero VAD
self._pipecat_resampler = AudioResampler("s16", "mono", 16000)
self._pipecat_resampler = AudioResampler("s16", "mono", 48000)
@self._webrtc_connection.event_handler("connected")
async def on_connected(connection: SmallWebRTCConnection):
@@ -366,31 +366,16 @@ class SmallWebRTCClient:
await asyncio.sleep(0.01)
continue
if frame.sample_rate > self._in_sample_rate:
resampled_frames = self._pipecat_resampler.resample(frame)
for resampled_frame in resampled_frames:
# 16-bit PCM bytes
pcm_array = resampled_frame.to_ndarray().astype(np.int16)
pcm_bytes = pcm_array.tobytes()
del pcm_array # free NumPy array immediately
audio_frame = InputAudioRawFrame(
audio=pcm_bytes,
sample_rate=resampled_frame.sample_rate,
num_channels=self._audio_in_channels,
)
del pcm_bytes # reference kept in audio_frame
yield audio_frame
else:
resampled_frames = self._pipecat_resampler.resample(frame)
for resampled_frame in resampled_frames:
# 16-bit PCM bytes
pcm_array = frame.to_ndarray().astype(np.int16)
pcm_array = resampled_frame.to_ndarray().astype(np.int16)
pcm_bytes = pcm_array.tobytes()
del pcm_array # free NumPy array immediately
audio_frame = InputAudioRawFrame(
audio=pcm_bytes,
sample_rate=frame.sample_rate,
sample_rate=resampled_frame.sample_rate,
num_channels=self._audio_in_channels,
)
del pcm_bytes # reference kept in audio_frame

View File

@@ -92,6 +92,24 @@ def _add_token_usage_to_span(span, token_usage):
span.set_attribute("gen_ai.usage.input_tokens", token_usage["prompt_tokens"])
if "completion_tokens" in token_usage:
span.set_attribute("gen_ai.usage.output_tokens", token_usage["completion_tokens"])
# Add cached token metrics for dictionary
if (
"cache_read_input_tokens" in token_usage
and token_usage["cache_read_input_tokens"] is not None
):
span.set_attribute(
"gen_ai.usage.cache_read_input_tokens", token_usage["cache_read_input_tokens"]
)
if (
"cache_creation_input_tokens" in token_usage
and token_usage["cache_creation_input_tokens"] is not None
):
span.set_attribute(
"gen_ai.usage.cache_creation_input_tokens",
token_usage["cache_creation_input_tokens"],
)
if "reasoning_tokens" in token_usage and token_usage["reasoning_tokens"] is not None:
span.set_attribute("gen_ai.usage.reasoning_tokens", token_usage["reasoning_tokens"])
else:
# Handle LLMTokenUsage object
span.set_attribute("gen_ai.usage.input_tokens", getattr(token_usage, "prompt_tokens", 0))
@@ -99,6 +117,19 @@ def _add_token_usage_to_span(span, token_usage):
"gen_ai.usage.output_tokens", getattr(token_usage, "completion_tokens", 0)
)
# Add cached token metrics for LLMTokenUsage object
cache_read_tokens = getattr(token_usage, "cache_read_input_tokens", None)
if cache_read_tokens is not None:
span.set_attribute("gen_ai.usage.cache_read_input_tokens", cache_read_tokens)
cache_creation_tokens = getattr(token_usage, "cache_creation_input_tokens", None)
if cache_creation_tokens is not None:
span.set_attribute("gen_ai.usage.cache_creation_input_tokens", cache_creation_tokens)
reasoning_tokens = getattr(token_usage, "reasoning_tokens", None)
if reasoning_tokens is not None:
span.set_attribute("gen_ai.usage.reasoning_tokens", reasoning_tokens)
def traced_tts(func: Optional[Callable] = None, *, name: Optional[str] = None) -> Callable:
"""Trace TTS service methods with TTS-specific attributes.
@@ -715,7 +746,7 @@ def traced_gemini_live(operation: str) -> Callable:
else:
operation_attrs["tool.result_status"] = "completed"
except json.JSONDecodeError as e:
except json.JSONDecodeError:
operation_attrs["tool.result"] = (
f"Invalid JSON: {str(result_content)[:500]}"
)