Compare commits

...

7 Commits

Author SHA1 Message Date
James Hush
cb6e86e69f docs: add a demo showing how to track usage 2025-10-16 13:45:42 +08:00
Aleix Conchillo Flaqué
fce6f55ddb Merge pull request #2844 from pipecat-ai/aleix/runner-files-path
runner: allow subdirectories in --folder
2025-10-14 08:38:38 -07:00
Aleix Conchillo Flaqué
d9580f72a9 runner: allow subdirectories in --folder 2025-10-13 18:29:19 -07:00
Aleix Conchillo Flaqué
0588c82bbf Merge pull request #2838 from makosst/manta_graph_readme
Added Manta Graph to README
2025-10-11 09:31:21 -07:00
makosst
16e9093d5a Added Manta Graph to README 2025-10-11 09:20:17 -07:00
Aleix Conchillo Flaqué
91a5d580fd Merge pull request #2835 from pipecat-ai/aleix/tts-http-aligned-audio-frames
tts: fix RimeHttpTTSService/PiperTTSService 16-bit audio frames alignment
2025-10-10 14:20:44 -07:00
Aleix Conchillo Flaqué
0473556992 tts: fix RimeHttpTTSService/PiperTTSService 16-bit audio frames alignment 2025-10-10 14:19:22 -07:00
7 changed files with 225 additions and 19 deletions

View File

@@ -5,6 +5,19 @@ All notable changes to **Pipecat** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
### Added
- The runner `--folder` argument now supports downloading files from
subdirectories.
### Fixed
- Fixed an issue where `RimeHttpTTSService` and `PiperTTSService` could generate
incorrectly 16-bit aligned audio frames, potentially leading to internal
errors or static audio.
## [0.0.90] - 2025-10-10
### Added

View File

@@ -3,6 +3,7 @@
</div></h1>
[![PyPI](https://img.shields.io/pypi/v/pipecat-ai)](https://pypi.org/project/pipecat-ai) ![Tests](https://github.com/pipecat-ai/pipecat/actions/workflows/tests.yaml/badge.svg) [![codecov](https://codecov.io/gh/pipecat-ai/pipecat/graph/badge.svg?token=LNVUIVO4Y9)](https://codecov.io/gh/pipecat-ai/pipecat) [![Docs](https://img.shields.io/badge/Documentation-blue)](https://docs.pipecat.ai) [![Discord](https://img.shields.io/discord/1239284677165056021)](https://discord.gg/pipecat) [![Ask DeepWiki](https://deepwiki.com/badge.svg)](https://deepwiki.com/pipecat-ai/pipecat)
[![](https://getmanta.ai/api/badges?text=Manta%20Graph&link=manta)](https://getmanta.ai/pipecat)
# 🎙️ Pipecat: Real-Time Voice & Multimodal AI Agents

View File

@@ -0,0 +1,156 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Example: Print OpenAI Realtime API Token Usage Statistics
This example demonstrates how to access and print token usage statistics
from the OpenAI Realtime API, including detailed breakdowns of input/output
tokens, cached tokens, and audio/text token usage.
"""
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.openai.realtime.llm import OpenAIRealtimeLLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
# We store functions so objects don't get instantiated until the desired
# transport gets selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
"""Main function demonstrating usage statistics tracking."""
logger.info(f"Starting bot")
# Initialize the OpenAI Realtime service
llm = OpenAIRealtimeLLMService(
api_key=os.getenv("OPENAI_API_KEY") or "",
model="gpt-4o-realtime-preview-2024-12-17",
)
# To access usage statistics, we wrap the internal response handler
# This is the cleanest way to intercept usage data from the realtime API
original_handler = llm._handle_evt_response_done
async def custom_response_done_handler(evt):
"""Custom handler that prints usage stats before calling original handler."""
# Print usage statistics if available
if evt.response.usage:
usage = evt.response.usage
logger.info("\n" + "=" * 50)
logger.info("📊 TOKEN USAGE STATISTICS")
logger.info("=" * 50)
logger.info(f"Total tokens: {usage.total_tokens}")
logger.info(f"Input tokens: {usage.input_tokens}")
logger.info(f"Output tokens: {usage.output_tokens}")
# Input token details
if usage.input_token_details:
logger.info(f"\n📥 Input token breakdown:")
logger.info(f" • Cached tokens: {usage.input_token_details.cached_tokens}")
logger.info(f" • Text tokens: {usage.input_token_details.text_tokens}")
logger.info(f" • Audio tokens: {usage.input_token_details.audio_tokens}")
# Cached token details if available
if usage.input_token_details.cached_tokens_details:
logger.info(
f" • Cached text tokens: {usage.input_token_details.cached_tokens_details.text_tokens}"
)
logger.info(
f" • Cached audio tokens: {usage.input_token_details.cached_tokens_details.audio_tokens}"
)
# Output token details
if usage.output_token_details:
logger.info(f"\n📤 Output token breakdown:")
logger.info(f" • Text tokens: {usage.output_token_details.text_tokens}")
logger.info(f" • Audio tokens: {usage.output_token_details.audio_tokens}")
logger.info("=" * 50 + "\n")
# Call the original handler to maintain normal functionality
await original_handler(evt)
# Replace the handler with our custom one
llm._handle_evt_response_done = custom_response_done_handler
# Create pipeline
pipeline = Pipeline(
[
transport.input(),
llm,
transport.output(),
]
)
# Create task
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info("Client connected")
logger.info("🎤 Speak into your microphone to interact with the assistant")
logger.info("📊 Usage statistics will be printed after each response")
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info("Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -217,7 +217,7 @@ def _setup_webrtc_routes(
"""Redirect root requests to client interface."""
return RedirectResponse(url="/client/")
@app.get("/files/{filename}")
@app.get("/files/{filename:path}")
async def download_file(filename: str):
"""Handle file downloads."""
if not folder:

View File

@@ -14,7 +14,6 @@ from loguru import logger
from pipecat.frames.frames import (
ErrorFrame,
Frame,
TTSAudioRawFrame,
TTSStartedFrame,
TTSStoppedFrame,
)
@@ -99,16 +98,15 @@ class PiperTTSService(TTSService):
await self.start_tts_usage_metrics(text)
yield TTSStartedFrame()
CHUNK_SIZE = self.chunk_size
yield TTSStartedFrame()
async for chunk in response.content.iter_chunked(CHUNK_SIZE):
# remove wav header if present
if chunk.startswith(b"RIFF"):
chunk = chunk[44:]
if len(chunk) > 0:
await self.stop_ttfb_metrics()
yield TTSAudioRawFrame(chunk, self.sample_rate, 1)
async for frame in self._stream_audio_frames_from_iterator(
response.content.iter_chunked(CHUNK_SIZE), strip_wav_header=True
):
await self.stop_ttfb_metrics()
yield frame
except Exception as e:
logger.error(f"Error in run_tts: {e}")
yield ErrorFrame(error=str(e))

View File

@@ -553,15 +553,13 @@ class RimeHttpTTSService(TTSService):
CHUNK_SIZE = self.chunk_size
async for chunk in response.content.iter_chunked(CHUNK_SIZE):
if need_to_strip_wav_header and chunk.startswith(b"RIFF"):
chunk = chunk[44:]
need_to_strip_wav_header = False
async for frame in self._stream_audio_frames_from_iterator(
response.content.iter_chunked(CHUNK_SIZE),
strip_wav_header=need_to_strip_wav_header,
):
await self.stop_ttfb_metrics()
yield frame
if len(chunk) > 0:
await self.stop_ttfb_metrics()
frame = TTSAudioRawFrame(chunk, self.sample_rate, 1)
yield frame
except Exception as e:
logger.exception(f"Error generating TTS: {e}")
yield ErrorFrame(error=f"Rime TTS error: {str(e)}")

View File

@@ -8,7 +8,17 @@
import asyncio
from abc import abstractmethod
from typing import Any, AsyncGenerator, Dict, List, Mapping, Optional, Sequence, Tuple
from typing import (
Any,
AsyncGenerator,
AsyncIterator,
Dict,
List,
Mapping,
Optional,
Sequence,
Tuple,
)
from loguru import logger
@@ -374,6 +384,36 @@ class TTSService(AIService):
):
await self._stop_frame_queue.put(frame)
async def _stream_audio_frames_from_iterator(
self, iterator: AsyncIterator[bytes], *, strip_wav_header: bool
) -> AsyncGenerator[Frame, None]:
buffer = bytearray()
need_to_strip_wav_header = strip_wav_header
async for chunk in iterator:
if need_to_strip_wav_header and chunk.startswith(b"RIFF"):
chunk = chunk[44:]
need_to_strip_wav_header = False
# Append to current buffer.
buffer.extend(chunk)
# Round to nearest even number.
aligned_length = len(buffer) & ~1 # 111111111...11110
if aligned_length > 0:
aligned_chunk = buffer[:aligned_length]
buffer = buffer[aligned_length:] # keep any leftover byte
if len(aligned_chunk) > 0:
frame = TTSAudioRawFrame(bytes(aligned_chunk), self.sample_rate, 1)
yield frame
if len(buffer) > 0:
# Make sure we don't need an extra padding byte.
if len(buffer) % 2 == 1:
buffer.extend(b"\x00")
frame = TTSAudioRawFrame(bytes(buffer), self.sample_rate, 1)
yield frame
async def _handle_interruption(self, frame: InterruptionFrame, direction: FrameDirection):
self._processing_text = False
await self._text_aggregator.handle_interruption()