Compare commits

..

3 Commits

Author SHA1 Message Date
James Hush
cb6e86e69f docs: add a demo showing how to track usage 2025-10-16 13:45:42 +08:00
Aleix Conchillo Flaqué
fce6f55ddb Merge pull request #2844 from pipecat-ai/aleix/runner-files-path
runner: allow subdirectories in --folder
2025-10-14 08:38:38 -07:00
Aleix Conchillo Flaqué
d9580f72a9 runner: allow subdirectories in --folder 2025-10-13 18:29:19 -07:00
4 changed files with 162 additions and 15 deletions

View File

@@ -9,9 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Added `daily_room_properties` parameter to `configure()` function in
`pipecat.runner.daily`, allowing custom `DailyRoomProperties` to be passed
for more flexible room configuration.
- The runner `--folder` argument now supports downloading files from
subdirectories.
### Fixed

View File

@@ -0,0 +1,156 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Example: Print OpenAI Realtime API Token Usage Statistics
This example demonstrates how to access and print token usage statistics
from the OpenAI Realtime API, including detailed breakdowns of input/output
tokens, cached tokens, and audio/text token usage.
"""
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.openai.realtime.llm import OpenAIRealtimeLLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
# We store functions so objects don't get instantiated until the desired
# transport gets selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
"""Main function demonstrating usage statistics tracking."""
logger.info(f"Starting bot")
# Initialize the OpenAI Realtime service
llm = OpenAIRealtimeLLMService(
api_key=os.getenv("OPENAI_API_KEY") or "",
model="gpt-4o-realtime-preview-2024-12-17",
)
# To access usage statistics, we wrap the internal response handler
# This is the cleanest way to intercept usage data from the realtime API
original_handler = llm._handle_evt_response_done
async def custom_response_done_handler(evt):
"""Custom handler that prints usage stats before calling original handler."""
# Print usage statistics if available
if evt.response.usage:
usage = evt.response.usage
logger.info("\n" + "=" * 50)
logger.info("📊 TOKEN USAGE STATISTICS")
logger.info("=" * 50)
logger.info(f"Total tokens: {usage.total_tokens}")
logger.info(f"Input tokens: {usage.input_tokens}")
logger.info(f"Output tokens: {usage.output_tokens}")
# Input token details
if usage.input_token_details:
logger.info(f"\n📥 Input token breakdown:")
logger.info(f" • Cached tokens: {usage.input_token_details.cached_tokens}")
logger.info(f" • Text tokens: {usage.input_token_details.text_tokens}")
logger.info(f" • Audio tokens: {usage.input_token_details.audio_tokens}")
# Cached token details if available
if usage.input_token_details.cached_tokens_details:
logger.info(
f" • Cached text tokens: {usage.input_token_details.cached_tokens_details.text_tokens}"
)
logger.info(
f" • Cached audio tokens: {usage.input_token_details.cached_tokens_details.audio_tokens}"
)
# Output token details
if usage.output_token_details:
logger.info(f"\n📤 Output token breakdown:")
logger.info(f" • Text tokens: {usage.output_token_details.text_tokens}")
logger.info(f" • Audio tokens: {usage.output_token_details.audio_tokens}")
logger.info("=" * 50 + "\n")
# Call the original handler to maintain normal functionality
await original_handler(evt)
# Replace the handler with our custom one
llm._handle_evt_response_done = custom_response_done_handler
# Create pipeline
pipeline = Pipeline(
[
transport.input(),
llm,
transport.output(),
]
)
# Create task
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info("Client connected")
logger.info("🎤 Speak into your microphone to interact with the assistant")
logger.info("📊 Usage statistics will be printed after each response")
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info("Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -42,7 +42,6 @@ from typing import Dict, List, Optional
import aiohttp
from loguru import logger
from pydantic import BaseModel
from typing_extensions import Literal
from pipecat.transports.daily.utils import (
DailyRESTHelper,
@@ -83,8 +82,6 @@ async def configure(
sip_enable_video: Optional[bool] = False,
sip_num_endpoints: Optional[int] = 1,
sip_codecs: Optional[Dict[str, List[str]]] = None,
enable_recording: Optional[Literal["cloud", "local", "raw-tracks"]] = None,
daily_room_properties: Optional[DailyRoomProperties] = None,
) -> DailyRoomConfig:
"""Configure Daily room URL and token with optional SIP capabilities.
@@ -102,9 +99,6 @@ async def configure(
sip_num_endpoints: Number of allowed SIP endpoints.
sip_codecs: Codecs to support for audio and video. If None, uses Daily defaults.
Example: {"audio": ["OPUS"], "video": ["H264"]}
enable_recording: Recording mode for the room. One of "cloud", "local", or "raw-tracks".
daily_room_properties: Custom room properties to use instead of default configuration.
If provided, overrides all other property settings.
Returns:
DailyRoomConfig: Object with room_url, token, and optional sip_endpoint.
@@ -161,7 +155,8 @@ async def configure(
# Create room properties
room_properties = DailyRoomProperties(
exp=expiration_time, eject_at_room_exp=True, enable_recording=enable_recording
exp=expiration_time,
eject_at_room_exp=True,
)
# Add SIP configuration if enabled
@@ -178,10 +173,7 @@ async def configure(
room_properties.start_video_off = not sip_enable_video # Voice-only by default
# Create room parameters
if daily_room_properties:
room_params = DailyRoomParams(name=room_name, properties=daily_room_properties)
else:
room_params = DailyRoomParams(name=room_name, properties=room_properties)
room_params = DailyRoomParams(name=room_name, properties=room_properties)
try:
room_response = await daily_rest_helper.create_room(room_params)

View File

@@ -217,7 +217,7 @@ def _setup_webrtc_routes(
"""Redirect root requests to client interface."""
return RedirectResponse(url="/client/")
@app.get("/files/{filename}")
@app.get("/files/{filename:path}")
async def download_file(filename: str):
"""Handle file downloads."""
if not folder: