Compare commits
1 Commits
mb/remove-
...
hush/usage
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cb6e86e69f |
156
examples/foundational/18-openai-realtime-usage.py
Normal file
156
examples/foundational/18-openai-realtime-usage.py
Normal file
@@ -0,0 +1,156 @@
|
|||||||
|
#
|
||||||
|
# Copyright (c) 2024–2025, Daily
|
||||||
|
#
|
||||||
|
# SPDX-License-Identifier: BSD 2-Clause License
|
||||||
|
#
|
||||||
|
|
||||||
|
"""Example: Print OpenAI Realtime API Token Usage Statistics
|
||||||
|
|
||||||
|
This example demonstrates how to access and print token usage statistics
|
||||||
|
from the OpenAI Realtime API, including detailed breakdowns of input/output
|
||||||
|
tokens, cached tokens, and audio/text token usage.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
from loguru import logger
|
||||||
|
|
||||||
|
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||||
|
from pipecat.audio.vad.vad_analyzer import VADParams
|
||||||
|
from pipecat.pipeline.pipeline import Pipeline
|
||||||
|
from pipecat.pipeline.runner import PipelineRunner
|
||||||
|
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||||
|
from pipecat.runner.types import RunnerArguments
|
||||||
|
from pipecat.runner.utils import create_transport
|
||||||
|
from pipecat.services.openai.realtime.llm import OpenAIRealtimeLLMService
|
||||||
|
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||||
|
from pipecat.transports.daily.transport import DailyParams
|
||||||
|
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||||
|
|
||||||
|
load_dotenv(override=True)
|
||||||
|
|
||||||
|
# We store functions so objects don't get instantiated until the desired
|
||||||
|
# transport gets selected.
|
||||||
|
transport_params = {
|
||||||
|
"daily": lambda: DailyParams(
|
||||||
|
audio_in_enabled=True,
|
||||||
|
audio_out_enabled=True,
|
||||||
|
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
|
||||||
|
),
|
||||||
|
"twilio": lambda: FastAPIWebsocketParams(
|
||||||
|
audio_in_enabled=True,
|
||||||
|
audio_out_enabled=True,
|
||||||
|
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
|
||||||
|
),
|
||||||
|
"webrtc": lambda: TransportParams(
|
||||||
|
audio_in_enabled=True,
|
||||||
|
audio_out_enabled=True,
|
||||||
|
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||||
|
"""Main function demonstrating usage statistics tracking."""
|
||||||
|
logger.info(f"Starting bot")
|
||||||
|
|
||||||
|
# Initialize the OpenAI Realtime service
|
||||||
|
llm = OpenAIRealtimeLLMService(
|
||||||
|
api_key=os.getenv("OPENAI_API_KEY") or "",
|
||||||
|
model="gpt-4o-realtime-preview-2024-12-17",
|
||||||
|
)
|
||||||
|
|
||||||
|
# To access usage statistics, we wrap the internal response handler
|
||||||
|
# This is the cleanest way to intercept usage data from the realtime API
|
||||||
|
original_handler = llm._handle_evt_response_done
|
||||||
|
|
||||||
|
async def custom_response_done_handler(evt):
|
||||||
|
"""Custom handler that prints usage stats before calling original handler."""
|
||||||
|
# Print usage statistics if available
|
||||||
|
if evt.response.usage:
|
||||||
|
usage = evt.response.usage
|
||||||
|
|
||||||
|
logger.info("\n" + "=" * 50)
|
||||||
|
logger.info("📊 TOKEN USAGE STATISTICS")
|
||||||
|
logger.info("=" * 50)
|
||||||
|
logger.info(f"Total tokens: {usage.total_tokens}")
|
||||||
|
logger.info(f"Input tokens: {usage.input_tokens}")
|
||||||
|
logger.info(f"Output tokens: {usage.output_tokens}")
|
||||||
|
|
||||||
|
# Input token details
|
||||||
|
if usage.input_token_details:
|
||||||
|
logger.info(f"\n📥 Input token breakdown:")
|
||||||
|
logger.info(f" • Cached tokens: {usage.input_token_details.cached_tokens}")
|
||||||
|
logger.info(f" • Text tokens: {usage.input_token_details.text_tokens}")
|
||||||
|
logger.info(f" • Audio tokens: {usage.input_token_details.audio_tokens}")
|
||||||
|
|
||||||
|
# Cached token details if available
|
||||||
|
if usage.input_token_details.cached_tokens_details:
|
||||||
|
logger.info(
|
||||||
|
f" • Cached text tokens: {usage.input_token_details.cached_tokens_details.text_tokens}"
|
||||||
|
)
|
||||||
|
logger.info(
|
||||||
|
f" • Cached audio tokens: {usage.input_token_details.cached_tokens_details.audio_tokens}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Output token details
|
||||||
|
if usage.output_token_details:
|
||||||
|
logger.info(f"\n📤 Output token breakdown:")
|
||||||
|
logger.info(f" • Text tokens: {usage.output_token_details.text_tokens}")
|
||||||
|
logger.info(f" • Audio tokens: {usage.output_token_details.audio_tokens}")
|
||||||
|
|
||||||
|
logger.info("=" * 50 + "\n")
|
||||||
|
|
||||||
|
# Call the original handler to maintain normal functionality
|
||||||
|
await original_handler(evt)
|
||||||
|
|
||||||
|
# Replace the handler with our custom one
|
||||||
|
llm._handle_evt_response_done = custom_response_done_handler
|
||||||
|
|
||||||
|
# Create pipeline
|
||||||
|
pipeline = Pipeline(
|
||||||
|
[
|
||||||
|
transport.input(),
|
||||||
|
llm,
|
||||||
|
transport.output(),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create task
|
||||||
|
task = PipelineTask(
|
||||||
|
pipeline,
|
||||||
|
params=PipelineParams(
|
||||||
|
allow_interruptions=True,
|
||||||
|
enable_metrics=True,
|
||||||
|
enable_usage_metrics=True,
|
||||||
|
),
|
||||||
|
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
|
||||||
|
)
|
||||||
|
|
||||||
|
@transport.event_handler("on_client_connected")
|
||||||
|
async def on_client_connected(transport, client):
|
||||||
|
logger.info("Client connected")
|
||||||
|
logger.info("🎤 Speak into your microphone to interact with the assistant")
|
||||||
|
logger.info("📊 Usage statistics will be printed after each response")
|
||||||
|
|
||||||
|
@transport.event_handler("on_client_disconnected")
|
||||||
|
async def on_client_disconnected(transport, client):
|
||||||
|
logger.info("Client disconnected")
|
||||||
|
await task.cancel()
|
||||||
|
|
||||||
|
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
|
||||||
|
|
||||||
|
await runner.run(task)
|
||||||
|
|
||||||
|
|
||||||
|
async def bot(runner_args: RunnerArguments):
|
||||||
|
"""Main bot entry point compatible with Pipecat Cloud."""
|
||||||
|
transport = await create_transport(runner_args, transport_params)
|
||||||
|
await run_bot(transport, runner_args)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
from pipecat.runner.run import main
|
||||||
|
|
||||||
|
main()
|
||||||
Reference in New Issue
Block a user