Compare commits
7 Commits
v0.0.90
...
hush/usage
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cb6e86e69f | ||
|
|
fce6f55ddb | ||
|
|
d9580f72a9 | ||
|
|
0588c82bbf | ||
|
|
16e9093d5a | ||
|
|
91a5d580fd | ||
|
|
0473556992 |
13
CHANGELOG.md
13
CHANGELOG.md
@@ -5,6 +5,19 @@ All notable changes to **Pipecat** will be documented in this file.
|
||||
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
||||
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||
|
||||
## [Unreleased]
|
||||
|
||||
### Added
|
||||
|
||||
- The runner `--folder` argument now supports downloading files from
|
||||
subdirectories.
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fixed an issue where `RimeHttpTTSService` and `PiperTTSService` could generate
|
||||
incorrectly 16-bit aligned audio frames, potentially leading to internal
|
||||
errors or static audio.
|
||||
|
||||
## [0.0.90] - 2025-10-10
|
||||
|
||||
### Added
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
</div></h1>
|
||||
|
||||
[](https://pypi.org/project/pipecat-ai)  [](https://codecov.io/gh/pipecat-ai/pipecat) [](https://docs.pipecat.ai) [](https://discord.gg/pipecat) [](https://deepwiki.com/pipecat-ai/pipecat)
|
||||
[](https://getmanta.ai/pipecat)
|
||||
|
||||
# 🎙️ Pipecat: Real-Time Voice & Multimodal AI Agents
|
||||
|
||||
|
||||
156
examples/foundational/18-openai-realtime-usage.py
Normal file
156
examples/foundational/18-openai-realtime-usage.py
Normal file
@@ -0,0 +1,156 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""Example: Print OpenAI Realtime API Token Usage Statistics
|
||||
|
||||
This example demonstrates how to access and print token usage statistics
|
||||
from the OpenAI Realtime API, including detailed breakdowns of input/output
|
||||
tokens, cached tokens, and audio/text token usage.
|
||||
"""
|
||||
|
||||
import os
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.audio.vad.vad_analyzer import VADParams
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.openai.realtime.llm import OpenAIRealtimeLLMService
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams
|
||||
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
# We store functions so objects don't get instantiated until the desired
|
||||
# transport gets selected.
|
||||
transport_params = {
|
||||
"daily": lambda: DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
|
||||
),
|
||||
"twilio": lambda: FastAPIWebsocketParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
|
||||
),
|
||||
"webrtc": lambda: TransportParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
"""Main function demonstrating usage statistics tracking."""
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
# Initialize the OpenAI Realtime service
|
||||
llm = OpenAIRealtimeLLMService(
|
||||
api_key=os.getenv("OPENAI_API_KEY") or "",
|
||||
model="gpt-4o-realtime-preview-2024-12-17",
|
||||
)
|
||||
|
||||
# To access usage statistics, we wrap the internal response handler
|
||||
# This is the cleanest way to intercept usage data from the realtime API
|
||||
original_handler = llm._handle_evt_response_done
|
||||
|
||||
async def custom_response_done_handler(evt):
|
||||
"""Custom handler that prints usage stats before calling original handler."""
|
||||
# Print usage statistics if available
|
||||
if evt.response.usage:
|
||||
usage = evt.response.usage
|
||||
|
||||
logger.info("\n" + "=" * 50)
|
||||
logger.info("📊 TOKEN USAGE STATISTICS")
|
||||
logger.info("=" * 50)
|
||||
logger.info(f"Total tokens: {usage.total_tokens}")
|
||||
logger.info(f"Input tokens: {usage.input_tokens}")
|
||||
logger.info(f"Output tokens: {usage.output_tokens}")
|
||||
|
||||
# Input token details
|
||||
if usage.input_token_details:
|
||||
logger.info(f"\n📥 Input token breakdown:")
|
||||
logger.info(f" • Cached tokens: {usage.input_token_details.cached_tokens}")
|
||||
logger.info(f" • Text tokens: {usage.input_token_details.text_tokens}")
|
||||
logger.info(f" • Audio tokens: {usage.input_token_details.audio_tokens}")
|
||||
|
||||
# Cached token details if available
|
||||
if usage.input_token_details.cached_tokens_details:
|
||||
logger.info(
|
||||
f" • Cached text tokens: {usage.input_token_details.cached_tokens_details.text_tokens}"
|
||||
)
|
||||
logger.info(
|
||||
f" • Cached audio tokens: {usage.input_token_details.cached_tokens_details.audio_tokens}"
|
||||
)
|
||||
|
||||
# Output token details
|
||||
if usage.output_token_details:
|
||||
logger.info(f"\n📤 Output token breakdown:")
|
||||
logger.info(f" • Text tokens: {usage.output_token_details.text_tokens}")
|
||||
logger.info(f" • Audio tokens: {usage.output_token_details.audio_tokens}")
|
||||
|
||||
logger.info("=" * 50 + "\n")
|
||||
|
||||
# Call the original handler to maintain normal functionality
|
||||
await original_handler(evt)
|
||||
|
||||
# Replace the handler with our custom one
|
||||
llm._handle_evt_response_done = custom_response_done_handler
|
||||
|
||||
# Create pipeline
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(),
|
||||
llm,
|
||||
transport.output(),
|
||||
]
|
||||
)
|
||||
|
||||
# Create task
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
),
|
||||
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
|
||||
)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info("Client connected")
|
||||
logger.info("🎤 Speak into your microphone to interact with the assistant")
|
||||
logger.info("📊 Usage statistics will be printed after each response")
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info("Client disconnected")
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
async def bot(runner_args: RunnerArguments):
|
||||
"""Main bot entry point compatible with Pipecat Cloud."""
|
||||
transport = await create_transport(runner_args, transport_params)
|
||||
await run_bot(transport, runner_args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from pipecat.runner.run import main
|
||||
|
||||
main()
|
||||
@@ -217,7 +217,7 @@ def _setup_webrtc_routes(
|
||||
"""Redirect root requests to client interface."""
|
||||
return RedirectResponse(url="/client/")
|
||||
|
||||
@app.get("/files/{filename}")
|
||||
@app.get("/files/{filename:path}")
|
||||
async def download_file(filename: str):
|
||||
"""Handle file downloads."""
|
||||
if not folder:
|
||||
|
||||
@@ -14,7 +14,6 @@ from loguru import logger
|
||||
from pipecat.frames.frames import (
|
||||
ErrorFrame,
|
||||
Frame,
|
||||
TTSAudioRawFrame,
|
||||
TTSStartedFrame,
|
||||
TTSStoppedFrame,
|
||||
)
|
||||
@@ -99,16 +98,15 @@ class PiperTTSService(TTSService):
|
||||
|
||||
await self.start_tts_usage_metrics(text)
|
||||
|
||||
yield TTSStartedFrame()
|
||||
|
||||
CHUNK_SIZE = self.chunk_size
|
||||
|
||||
yield TTSStartedFrame()
|
||||
async for chunk in response.content.iter_chunked(CHUNK_SIZE):
|
||||
# remove wav header if present
|
||||
if chunk.startswith(b"RIFF"):
|
||||
chunk = chunk[44:]
|
||||
if len(chunk) > 0:
|
||||
await self.stop_ttfb_metrics()
|
||||
yield TTSAudioRawFrame(chunk, self.sample_rate, 1)
|
||||
async for frame in self._stream_audio_frames_from_iterator(
|
||||
response.content.iter_chunked(CHUNK_SIZE), strip_wav_header=True
|
||||
):
|
||||
await self.stop_ttfb_metrics()
|
||||
yield frame
|
||||
except Exception as e:
|
||||
logger.error(f"Error in run_tts: {e}")
|
||||
yield ErrorFrame(error=str(e))
|
||||
|
||||
@@ -553,15 +553,13 @@ class RimeHttpTTSService(TTSService):
|
||||
|
||||
CHUNK_SIZE = self.chunk_size
|
||||
|
||||
async for chunk in response.content.iter_chunked(CHUNK_SIZE):
|
||||
if need_to_strip_wav_header and chunk.startswith(b"RIFF"):
|
||||
chunk = chunk[44:]
|
||||
need_to_strip_wav_header = False
|
||||
async for frame in self._stream_audio_frames_from_iterator(
|
||||
response.content.iter_chunked(CHUNK_SIZE),
|
||||
strip_wav_header=need_to_strip_wav_header,
|
||||
):
|
||||
await self.stop_ttfb_metrics()
|
||||
yield frame
|
||||
|
||||
if len(chunk) > 0:
|
||||
await self.stop_ttfb_metrics()
|
||||
frame = TTSAudioRawFrame(chunk, self.sample_rate, 1)
|
||||
yield frame
|
||||
except Exception as e:
|
||||
logger.exception(f"Error generating TTS: {e}")
|
||||
yield ErrorFrame(error=f"Rime TTS error: {str(e)}")
|
||||
|
||||
@@ -8,7 +8,17 @@
|
||||
|
||||
import asyncio
|
||||
from abc import abstractmethod
|
||||
from typing import Any, AsyncGenerator, Dict, List, Mapping, Optional, Sequence, Tuple
|
||||
from typing import (
|
||||
Any,
|
||||
AsyncGenerator,
|
||||
AsyncIterator,
|
||||
Dict,
|
||||
List,
|
||||
Mapping,
|
||||
Optional,
|
||||
Sequence,
|
||||
Tuple,
|
||||
)
|
||||
|
||||
from loguru import logger
|
||||
|
||||
@@ -374,6 +384,36 @@ class TTSService(AIService):
|
||||
):
|
||||
await self._stop_frame_queue.put(frame)
|
||||
|
||||
async def _stream_audio_frames_from_iterator(
|
||||
self, iterator: AsyncIterator[bytes], *, strip_wav_header: bool
|
||||
) -> AsyncGenerator[Frame, None]:
|
||||
buffer = bytearray()
|
||||
need_to_strip_wav_header = strip_wav_header
|
||||
async for chunk in iterator:
|
||||
if need_to_strip_wav_header and chunk.startswith(b"RIFF"):
|
||||
chunk = chunk[44:]
|
||||
need_to_strip_wav_header = False
|
||||
|
||||
# Append to current buffer.
|
||||
buffer.extend(chunk)
|
||||
|
||||
# Round to nearest even number.
|
||||
aligned_length = len(buffer) & ~1 # 111111111...11110
|
||||
if aligned_length > 0:
|
||||
aligned_chunk = buffer[:aligned_length]
|
||||
buffer = buffer[aligned_length:] # keep any leftover byte
|
||||
|
||||
if len(aligned_chunk) > 0:
|
||||
frame = TTSAudioRawFrame(bytes(aligned_chunk), self.sample_rate, 1)
|
||||
yield frame
|
||||
|
||||
if len(buffer) > 0:
|
||||
# Make sure we don't need an extra padding byte.
|
||||
if len(buffer) % 2 == 1:
|
||||
buffer.extend(b"\x00")
|
||||
frame = TTSAudioRawFrame(bytes(buffer), self.sample_rate, 1)
|
||||
yield frame
|
||||
|
||||
async def _handle_interruption(self, frame: InterruptionFrame, direction: FrameDirection):
|
||||
self._processing_text = False
|
||||
await self._text_aggregator.handle_interruption()
|
||||
|
||||
Reference in New Issue
Block a user