Compare commits
1 Commits
v0.0.58
...
hush/realt
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
230d92850a |
15
CHANGELOG.md
15
CHANGELOG.md
@@ -5,13 +5,10 @@ All notable changes to **Pipecat** will be documented in this file.
|
||||
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
||||
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||
|
||||
## [0.0.58] - 2025-02-26
|
||||
## [Unreleased]
|
||||
|
||||
### Added
|
||||
|
||||
- Added track-specific audio event `on_track_audio_data` to
|
||||
`AudioBufferProcessor` for accessing separate input and output audio tracks.
|
||||
|
||||
- Pipecat version will now be logged on every application startup. This will
|
||||
help us identify what version we are running in case of any issues.
|
||||
|
||||
@@ -48,10 +45,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
- ⚠️ `PipelineTask` now requires keyword arguments (except for the first one for
|
||||
the pipeline).
|
||||
|
||||
- Updated `PlayHTHttpTTSService` to take a `voice_engine` and `protocol` input
|
||||
in the constructor. The previous method of providing a `voice_engine` input
|
||||
that contains the engine and protocol is deprecated by PlayHT.
|
||||
|
||||
- The base `TTSService` class now strips leading newlines before sending text
|
||||
to the TTS provider. This change is to solve issues where some TTS providers,
|
||||
like Azure, would not output text due to newlines.
|
||||
@@ -85,9 +78,6 @@ stt = DeepgramSTTService(..., live_options=LiveOptions(model="nova-2-general"))
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fixed a `GoogleLLMService` that was causing an exception when sending inline
|
||||
audio in some cases.
|
||||
|
||||
- Fixed an `AudioContextWordTTSService` issue that would cause an `EndFrame` to
|
||||
disconnect from the TTS service before audio from all the contexts was
|
||||
received. This affected services like Cartesia and Rime.
|
||||
@@ -134,9 +124,6 @@ stt = DeepgramSTTService(..., live_options=LiveOptions(model="nova-2-general"))
|
||||
|
||||
- Added Gemini support to `examples/phone-chatbot`.
|
||||
|
||||
- Added foundational example `34-audio-recording.py` showing how to use the
|
||||
AudioBufferProcessor callbacks to save merged and track recordings.
|
||||
|
||||
## [0.0.57] - 2025-02-14
|
||||
|
||||
### Added
|
||||
|
||||
@@ -1,103 +0,0 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import sys
|
||||
|
||||
import aiohttp
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
from runner import configure
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.services.elevenlabs import ElevenLabsHttpTTSService
|
||||
from pipecat.services.openai import OpenAILLMService
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
logger.remove(0)
|
||||
logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
|
||||
async def main():
|
||||
async with aiohttp.ClientSession() as session:
|
||||
(room_url, token) = await configure(session)
|
||||
|
||||
transport = DailyTransport(
|
||||
room_url,
|
||||
token,
|
||||
"Respond bot",
|
||||
DailyParams(
|
||||
audio_out_enabled=True,
|
||||
transcription_enabled=True,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
),
|
||||
)
|
||||
|
||||
tts = ElevenLabsHttpTTSService(
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY", ""),
|
||||
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
|
||||
aiohttp_session=session,
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
|
||||
},
|
||||
]
|
||||
|
||||
context = OpenAILLMContext(messages)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
context_aggregator.user(), # User responses
|
||||
llm, # LLM
|
||||
tts, # TTS
|
||||
transport.output(), # Transport bot output
|
||||
context_aggregator.assistant(), # Assistant spoken responses
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
report_only_initial_ttfb=True,
|
||||
),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
|
||||
@transport.event_handler("on_participant_left")
|
||||
async def on_participant_left(transport, participant, reason):
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner()
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
@@ -1,103 +0,0 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import sys
|
||||
|
||||
import aiohttp
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
from runner import configure
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.services.openai import OpenAILLMService
|
||||
from pipecat.services.rime import RimeHttpTTSService
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
logger.remove(0)
|
||||
logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
|
||||
async def main():
|
||||
async with aiohttp.ClientSession() as session:
|
||||
(room_url, token) = await configure(session)
|
||||
|
||||
transport = DailyTransport(
|
||||
room_url,
|
||||
token,
|
||||
"Respond bot",
|
||||
DailyParams(
|
||||
audio_out_enabled=True,
|
||||
transcription_enabled=True,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
),
|
||||
)
|
||||
|
||||
tts = RimeHttpTTSService(
|
||||
api_key=os.getenv("RIME_API_KEY", ""),
|
||||
voice_id="rex",
|
||||
aiohttp_session=session,
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
|
||||
},
|
||||
]
|
||||
|
||||
context = OpenAILLMContext(messages)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
context_aggregator.user(), # User responses
|
||||
llm, # LLM
|
||||
tts, # TTS
|
||||
transport.output(), # Transport bot output
|
||||
context_aggregator.assistant(), # Assistant spoken responses
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
report_only_initial_ttfb=True,
|
||||
),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
|
||||
@transport.event_handler("on_participant_left")
|
||||
async def on_participant_left(transport, participant, reason):
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner()
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
@@ -16,10 +16,13 @@ from runner import configure
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.audio.vad.vad_analyzer import VADParams
|
||||
from pipecat.frames.frames import TranscriptionMessage, TranscriptionUpdateFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.processors.transcript_processor import TranscriptProcessor
|
||||
from pipecat.services.deepgram import DeepgramSTTService
|
||||
from pipecat.services.openai_realtime_beta import (
|
||||
InputAudioTranscription,
|
||||
OpenAIRealtimeBetaLLMService,
|
||||
@@ -140,14 +143,22 @@ Remember, your responses should be short. Just one or two sentences, usually."""
|
||||
tools,
|
||||
)
|
||||
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
|
||||
# Create transcript processor and handler
|
||||
transcript = TranscriptProcessor()
|
||||
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
stt,
|
||||
transcript.user(), # User transcripts
|
||||
context_aggregator.user(),
|
||||
llm, # LLM
|
||||
context_aggregator.assistant(),
|
||||
transcript.assistant(), # Assistant transcripts
|
||||
transport.output(), # Transport bot output
|
||||
]
|
||||
)
|
||||
@@ -162,9 +173,16 @@ Remember, your responses should be short. Just one or two sentences, usually."""
|
||||
),
|
||||
)
|
||||
|
||||
# Register event handler for transcript updates
|
||||
@transcript.event_handler("on_transcript_update")
|
||||
async def on_transcript_update(processor, frame):
|
||||
logger.debug(f"Received transcript update with {len(frame.messages)} new messages")
|
||||
for msg in frame.messages:
|
||||
logger.debug(msg)
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# await transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
|
||||
|
||||
@@ -389,7 +389,7 @@ class AudioAccumulator(FrameProcessor):
|
||||
)
|
||||
self._user_speaking = False
|
||||
context = GoogleLLMContext()
|
||||
context.add_audio_frames_message(audio_frames=self._audio_frames)
|
||||
context.add_audio_frames_message(text="Audio follows", audio_frames=self._audio_frames)
|
||||
await self.push_frame(OpenAILLMContextFrame(context=context))
|
||||
elif isinstance(frame, InputAudioRawFrame):
|
||||
# Append the audio frame to our buffer. Treat the buffer as a ring buffer, dropping the oldest
|
||||
|
||||
@@ -1,186 +0,0 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""Audio Recording Example with Pipecat.
|
||||
|
||||
This example demonstrates how to record audio from a conversation between a user and an AI assistant,
|
||||
saving both merged and individual audio tracks. It showcases the AudioBufferProcessor's capabilities
|
||||
to handle both combined and separate audio streams.
|
||||
|
||||
The example:
|
||||
1. Sets up a basic conversation with an AI assistant
|
||||
2. Records the entire conversation
|
||||
3. Saves three separate WAV files:
|
||||
- A merged recording of both participants
|
||||
- Individual recording of user audio
|
||||
- Individual recording of assistant audio
|
||||
|
||||
Example usage (run from pipecat root directory):
|
||||
$ pip install "pipecat-ai[daily,openai,cartesia,silero]"
|
||||
$ pip install -r dev-requirements.txt
|
||||
$ python examples/foundational/34-audio-recording.py
|
||||
|
||||
Requirements:
|
||||
- OpenAI API key (for GPT-4)
|
||||
- Cartesia API key (for text-to-speech)
|
||||
- Daily API key (for video/audio transport)
|
||||
|
||||
Environment variables (.env file):
|
||||
OPENAI_API_KEY=your_openai_key
|
||||
CARTESIA_API_KEY=your_cartesia_key
|
||||
DAILY_API_KEY=your_daily_key
|
||||
|
||||
The recordings will be saved in a 'recordings' directory with timestamps:
|
||||
recordings/
|
||||
merged_20240315_123456.wav (Combined audio)
|
||||
user_20240315_123456.wav (User audio only)
|
||||
bot_20240315_123456.wav (Bot audio only)
|
||||
|
||||
Note:
|
||||
This example requires the AudioBufferProcessor with track-specific audio support,
|
||||
which provides both 'on_audio_data' and 'on_track_audio_data' events for
|
||||
handling merged and separate audio tracks respectively.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import datetime
|
||||
import io
|
||||
import os
|
||||
import sys
|
||||
import wave
|
||||
|
||||
import aiofiles
|
||||
import aiohttp
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
from runner import configure
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor
|
||||
from pipecat.services.cartesia import CartesiaTTSService
|
||||
from pipecat.services.openai import OpenAILLMService
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
|
||||
load_dotenv(override=True)
|
||||
logger.remove(0)
|
||||
logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
|
||||
async def save_audio_file(audio: bytes, filename: str, sample_rate: int, num_channels: int):
|
||||
"""Save audio data to a WAV file."""
|
||||
if len(audio) > 0:
|
||||
with io.BytesIO() as buffer:
|
||||
with wave.open(buffer, "wb") as wf:
|
||||
wf.setsampwidth(2)
|
||||
wf.setnchannels(num_channels)
|
||||
wf.setframerate(sample_rate)
|
||||
wf.writeframes(audio)
|
||||
async with aiofiles.open(filename, "wb") as file:
|
||||
await file.write(buffer.getvalue())
|
||||
logger.info(f"Audio saved to {filename}")
|
||||
|
||||
|
||||
async def main():
|
||||
async with aiohttp.ClientSession() as session:
|
||||
(room_url, token) = await configure(session)
|
||||
|
||||
transport = DailyTransport(
|
||||
room_url,
|
||||
token,
|
||||
"Recording bot",
|
||||
DailyParams(
|
||||
# audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
transcription_enabled=True,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
vad_audio_passthrough=True, # Enable audio passthrough for recording
|
||||
),
|
||||
)
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22",
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4")
|
||||
|
||||
# Create audio buffer processor
|
||||
audiobuffer = AudioBufferProcessor()
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful assistant demonstrating audio recording capabilities. Keep your responses brief and clear.",
|
||||
},
|
||||
]
|
||||
|
||||
context = OpenAILLMContext(messages)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(),
|
||||
context_aggregator.user(),
|
||||
llm,
|
||||
tts,
|
||||
transport.output(),
|
||||
audiobuffer, # Add audio buffer to pipeline
|
||||
context_aggregator.assistant(),
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
await audiobuffer.start_recording()
|
||||
messages.append(
|
||||
{
|
||||
"role": "system",
|
||||
"content": "Greet the user and explain that this conversation will be recorded.",
|
||||
}
|
||||
)
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
|
||||
@transport.event_handler("on_participant_left")
|
||||
async def on_participant_left(transport, participant, reason):
|
||||
await audiobuffer.stop_recording()
|
||||
await task.cancel()
|
||||
|
||||
# Handler for merged audio
|
||||
@audiobuffer.event_handler("on_audio_data")
|
||||
async def on_audio_data(buffer, audio, sample_rate, num_channels):
|
||||
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
filename = f"recordings/merged_{timestamp}.wav"
|
||||
os.makedirs("recordings", exist_ok=True)
|
||||
await save_audio_file(audio, filename, sample_rate, num_channels)
|
||||
|
||||
# Handler for separate tracks
|
||||
@audiobuffer.event_handler("on_track_audio_data")
|
||||
async def on_track_audio_data(buffer, user_audio, bot_audio, sample_rate, num_channels):
|
||||
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
os.makedirs("recordings", exist_ok=True)
|
||||
|
||||
# Save user audio
|
||||
user_filename = f"recordings/user_{timestamp}.wav"
|
||||
await save_audio_file(user_audio, user_filename, sample_rate, 1)
|
||||
|
||||
# Save bot audio
|
||||
bot_filename = f"recordings/bot_{timestamp}.wav"
|
||||
await save_audio_file(bot_audio, bot_filename, sample_rate, 1)
|
||||
|
||||
runner = PipelineRunner()
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
@@ -7,15 +7,20 @@ import argparse
|
||||
import asyncio
|
||||
import os
|
||||
import sys
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
|
||||
import google.ai.generativelanguage as glm
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import (
|
||||
BotStoppedSpeakingFrame,
|
||||
EndTaskFrame,
|
||||
Frame,
|
||||
InputAudioRawFrame,
|
||||
SystemFrame,
|
||||
TranscriptionFrame,
|
||||
UserStartedSpeakingFrame,
|
||||
UserStoppedSpeakingFrame,
|
||||
@@ -23,11 +28,11 @@ from pipecat.frames.frames import (
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.services.ai_services import LLMService
|
||||
from pipecat.services.elevenlabs import ElevenLabsTTSService
|
||||
from pipecat.services.google import GoogleLLMService
|
||||
from pipecat.services.google.google import GoogleLLMContext
|
||||
from pipecat.services.google import GoogleLLMContext, GoogleLLMService
|
||||
from pipecat.transports.services.daily import DailyDialinSettings, DailyParams, DailyTransport
|
||||
|
||||
load_dotenv(override=True)
|
||||
@@ -235,7 +240,7 @@ If it sounds like a human (saying hello, asking questions, etc.), call the funct
|
||||
DO NOT say anything until you've determined if this is a voicemail or human."""
|
||||
|
||||
llm = GoogleLLMService(
|
||||
model="models/gemini-2.0-flash-lite",
|
||||
model="models/gemini-2.0-flash-lite-preview-02-05",
|
||||
api_key=os.getenv("GOOGLE_API_KEY"),
|
||||
system_instruction=system_instruction,
|
||||
tools=tools,
|
||||
|
||||
@@ -20,14 +20,17 @@ classifiers = [
|
||||
"Topic :: Scientific/Engineering :: Artificial Intelligence"
|
||||
]
|
||||
dependencies = [
|
||||
"aiohttp~=3.11.13",
|
||||
"aiohttp~=3.11.11",
|
||||
"audioop-lts~=0.2.1; python_version>='3.13'",
|
||||
# We need an older version of `httpx` that doesn't remove the deprecated
|
||||
# `proxies` argument. This is necessary for Azure and Anthropic clients.
|
||||
"httpx~=0.27.2",
|
||||
"loguru~=0.7.3",
|
||||
"Markdown~=3.7",
|
||||
"numpy~=1.26.4",
|
||||
"Pillow~=11.1.0",
|
||||
"protobuf~=5.29.3",
|
||||
"pydantic~=2.10.6",
|
||||
"pydantic~=2.10.5",
|
||||
"pyloudnorm~=0.1.1",
|
||||
"resampy~=0.4.3",
|
||||
"soxr~=0.5.0",
|
||||
@@ -39,7 +42,7 @@ Source = "https://github.com/pipecat-ai/pipecat"
|
||||
Website = "https://pipecat.ai"
|
||||
|
||||
[project.optional-dependencies]
|
||||
anthropic = [ "anthropic~=0.47.2" ]
|
||||
anthropic = [ "anthropic~=0.45.2" ]
|
||||
assemblyai = [ "assemblyai~=0.36.0" ]
|
||||
aws = [ "boto3~=1.35.99" ]
|
||||
azure = [ "azure-cognitiveservices-speech~=1.42.0"]
|
||||
@@ -47,13 +50,13 @@ canonical = [ "aiofiles~=24.1.0" ]
|
||||
cartesia = [ "cartesia~=1.3.1", "websockets~=13.1" ]
|
||||
cerebras = []
|
||||
deepseek = []
|
||||
daily = [ "daily-python~=0.15.0" ]
|
||||
daily = [ "daily-python~=0.14.2" ]
|
||||
deepgram = [ "deepgram-sdk~=3.8.0" ]
|
||||
elevenlabs = [ "websockets~=13.1" ]
|
||||
fal = [ "fal-client~=0.5.6" ]
|
||||
fish = [ "ormsgpack~=1.7.0", "websockets~=13.1" ]
|
||||
gladia = [ "websockets~=13.1" ]
|
||||
google = [ "google-cloud-speech~=2.31.0", "google-cloud-texttospeech~=2.25.0", "google-genai~=1.3.0", "google-generativeai~=0.8.4" ]
|
||||
google = [ "google-cloud-speech~=2.31.0", "google-cloud-texttospeech~=2.25.0", "google-genai~=1.2.0", "google-generativeai~=0.8.4" ]
|
||||
grok = []
|
||||
groq = []
|
||||
gstreamer = [ "pygobject~=3.50.0" ]
|
||||
|
||||
@@ -21,32 +21,20 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
|
||||
|
||||
class AudioBufferProcessor(FrameProcessor):
|
||||
"""Processes and buffers audio frames from both input (user) and output (bot) sources.
|
||||
"""This processor buffers audio raw frames (input and output). The mixed
|
||||
audio can be obtained by registering an "on_audio_data" event handler.
|
||||
The event handler will be called every time `buffer_size` is reached.
|
||||
|
||||
This processor manages audio buffering and synchronization, providing both merged and
|
||||
track-specific audio access through event handlers. It supports various audio configurations
|
||||
including sample rate conversion and mono/stereo output.
|
||||
You can provide the desired output `sample_rate` and incoming audio frames
|
||||
will resampled to match it. Also, you can provide the number of channels, 1
|
||||
for mono and 2 for stereo. With mono audio user and bot audio will be mixed,
|
||||
in the case of stereo the left channel will be used for the user's audio and
|
||||
the right channel for the bot.
|
||||
|
||||
Events:
|
||||
on_audio_data: Triggered when buffer_size is reached, providing merged audio
|
||||
on_track_audio_data: Triggered when buffer_size is reached, providing separate tracks
|
||||
Most of the time, user audio will be a continuous stream but it's possible
|
||||
that in some cases only the spoken audio is sent. To accomodate for those
|
||||
cases make sure to set `user_continuous_stream` accordingly.
|
||||
|
||||
Args:
|
||||
sample_rate (Optional[int]): Desired output sample rate. If None, uses source rate
|
||||
num_channels (int): Number of channels (1 for mono, 2 for stereo). Defaults to 1
|
||||
buffer_size (int): Size of buffer before triggering events. 0 for no buffering
|
||||
user_continuous_stream (bool): Whether user audio is continuous or speech-only
|
||||
|
||||
Audio handling:
|
||||
- Mono output (num_channels=1): User and bot audio are mixed
|
||||
- Stereo output (num_channels=2): User audio on left, bot audio on right
|
||||
- Automatic resampling of incoming audio to match desired sample_rate
|
||||
- Silence insertion for non-continuous audio streams
|
||||
- Buffer synchronization between user and bot audio
|
||||
|
||||
Note:
|
||||
When user_continuous_stream is False, the processor expects only speech
|
||||
segments and will handle silence insertion between segments automatically.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
@@ -77,45 +65,21 @@ class AudioBufferProcessor(FrameProcessor):
|
||||
self._resampler = create_default_resampler()
|
||||
|
||||
self._register_event_handler("on_audio_data")
|
||||
self._register_event_handler("on_track_audio_data")
|
||||
|
||||
@property
|
||||
def sample_rate(self) -> int:
|
||||
"""Current sample rate of the audio processor.
|
||||
|
||||
Returns:
|
||||
int: The sample rate in Hz
|
||||
"""
|
||||
return self._sample_rate
|
||||
|
||||
@property
|
||||
def num_channels(self) -> int:
|
||||
"""Number of channels in the audio output.
|
||||
|
||||
Returns:
|
||||
int: Number of channels (1 for mono, 2 for stereo)
|
||||
"""
|
||||
return self._num_channels
|
||||
|
||||
def has_audio(self) -> bool:
|
||||
"""Check if both user and bot audio buffers contain data.
|
||||
|
||||
Returns:
|
||||
bool: True if both buffers contain audio data
|
||||
"""
|
||||
return self._buffer_has_audio(self._user_audio_buffer) and self._buffer_has_audio(
|
||||
self._bot_audio_buffer
|
||||
)
|
||||
|
||||
def merge_audio_buffers(self) -> bytes:
|
||||
"""Merge user and bot audio buffers into a single audio stream.
|
||||
|
||||
For mono output, audio is mixed. For stereo output, user audio is placed
|
||||
on the left channel and bot audio on the right channel.
|
||||
|
||||
Returns:
|
||||
bytes: Mixed audio data
|
||||
"""
|
||||
if self._num_channels == 1:
|
||||
return mix_audio(bytes(self._user_audio_buffer), bytes(self._bot_audio_buffer))
|
||||
elif self._num_channels == 2:
|
||||
@@ -126,23 +90,14 @@ class AudioBufferProcessor(FrameProcessor):
|
||||
return b""
|
||||
|
||||
async def start_recording(self):
|
||||
"""Start recording audio from both user and bot.
|
||||
|
||||
Initializes recording state and resets audio buffers.
|
||||
"""
|
||||
self._recording = True
|
||||
self._reset_recording()
|
||||
|
||||
async def stop_recording(self):
|
||||
"""Stop recording and trigger final audio data handlers.
|
||||
|
||||
Calls audio handlers with any remaining buffered audio before stopping.
|
||||
"""
|
||||
await self._call_on_audio_data_handler()
|
||||
self._recording = False
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
"""Process incoming audio frames and manage audio buffers."""
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
# Update output sample rate if necessary.
|
||||
@@ -205,21 +160,10 @@ class AudioBufferProcessor(FrameProcessor):
|
||||
if not self.has_audio() or not self._recording:
|
||||
return
|
||||
|
||||
# Call original handler with merged audio
|
||||
merged_audio = self.merge_audio_buffers()
|
||||
await self._call_event_handler(
|
||||
"on_audio_data", merged_audio, self._sample_rate, self._num_channels
|
||||
)
|
||||
|
||||
# Call new handler with separate tracks
|
||||
await self._call_event_handler(
|
||||
"on_track_audio_data",
|
||||
bytes(self._user_audio_buffer),
|
||||
bytes(self._bot_audio_buffer),
|
||||
self._sample_rate,
|
||||
self._num_channels,
|
||||
)
|
||||
|
||||
self._reset_audio_buffers()
|
||||
|
||||
def _buffer_has_audio(self, buffer: bytearray) -> bool:
|
||||
|
||||
@@ -570,12 +570,10 @@ class ElevenLabsHttpTTSService(TTSService):
|
||||
|
||||
await self.start_tts_usage_metrics(text)
|
||||
|
||||
# Process the streaming response
|
||||
CHUNK_SIZE = 1024
|
||||
|
||||
yield TTSStartedFrame()
|
||||
async for chunk in response.content.iter_chunked(CHUNK_SIZE):
|
||||
if len(chunk) > 0:
|
||||
|
||||
async for chunk in response.content:
|
||||
if chunk:
|
||||
await self.stop_ttfb_metrics()
|
||||
yield TTSAudioRawFrame(chunk, self.sample_rate, 1)
|
||||
except Exception as e:
|
||||
|
||||
@@ -722,9 +722,7 @@ class GoogleLLMContext(OpenAILLMContext):
|
||||
|
||||
self.add_message(glm.Content(role="user", parts=parts))
|
||||
|
||||
def add_audio_frames_message(
|
||||
self, *, audio_frames: list[AudioRawFrame], text: str = "Audio follows"
|
||||
):
|
||||
def add_audio_frames_message(self, *, audio_frames: list[AudioRawFrame], text: str = None):
|
||||
if not audio_frames:
|
||||
return
|
||||
|
||||
@@ -733,9 +731,8 @@ class GoogleLLMContext(OpenAILLMContext):
|
||||
|
||||
parts = []
|
||||
data = b"".join(frame.audio for frame in audio_frames)
|
||||
# NOTE(aleix): According to the docs only text or inline_data should be needed.
|
||||
# (see https://cloud.google.com/vertex-ai/generative-ai/docs/model-reference/inference)
|
||||
parts.append(glm.Part(text=text))
|
||||
if text:
|
||||
parts.append(glm.Part(text=text))
|
||||
parts.append(
|
||||
glm.Part(
|
||||
inline_data=glm.Blob(
|
||||
|
||||
@@ -530,10 +530,8 @@ class OpenAITTSService(TTSService):
|
||||
|
||||
await self.start_tts_usage_metrics(text)
|
||||
|
||||
CHUNK_SIZE = 1024
|
||||
|
||||
yield TTSStartedFrame()
|
||||
async for chunk in r.iter_bytes(CHUNK_SIZE):
|
||||
async for chunk in r.iter_bytes(8192):
|
||||
if len(chunk) > 0:
|
||||
await self.stop_ttfb_metrics()
|
||||
frame = TTSAudioRawFrame(chunk, self.sample_rate, 1)
|
||||
|
||||
@@ -323,8 +323,7 @@ class PlayHTHttpTTSService(TTSService):
|
||||
api_key: str,
|
||||
user_id: str,
|
||||
voice_url: str,
|
||||
voice_engine: str = "Play3.0-mini",
|
||||
protocol: str = "http", # Options: http, ws
|
||||
voice_engine: str = "Play3.0-mini-http", # Options: Play3.0-mini-http, Play3.0-mini-ws
|
||||
sample_rate: Optional[int] = None,
|
||||
params: InputParams = InputParams(),
|
||||
**kwargs,
|
||||
@@ -338,24 +337,12 @@ class PlayHTHttpTTSService(TTSService):
|
||||
user_id=self._user_id,
|
||||
api_key=self._api_key,
|
||||
)
|
||||
|
||||
# Check if voice_engine contains protocol information (backward compatibility)
|
||||
if "-http" in voice_engine:
|
||||
# Extract the base engine name
|
||||
voice_engine = voice_engine.replace("-http", "")
|
||||
protocol = "http"
|
||||
elif "-ws" in voice_engine:
|
||||
# Extract the base engine name
|
||||
voice_engine = voice_engine.replace("-ws", "")
|
||||
protocol = "ws"
|
||||
|
||||
self._settings = {
|
||||
"language": self.language_to_service_language(params.language)
|
||||
if params.language
|
||||
else "english",
|
||||
"format": Format.FORMAT_WAV,
|
||||
"voice_engine": voice_engine,
|
||||
"protocol": protocol,
|
||||
"speed": params.speed,
|
||||
"seed": params.seed,
|
||||
}
|
||||
@@ -396,22 +383,19 @@ class PlayHTHttpTTSService(TTSService):
|
||||
|
||||
try:
|
||||
options = self._create_options()
|
||||
b = bytearray()
|
||||
in_header = True
|
||||
|
||||
await self.start_ttfb_metrics()
|
||||
|
||||
playht_gen = self._client.tts(
|
||||
text,
|
||||
voice_engine=self._settings["voice_engine"],
|
||||
protocol=self._settings["protocol"],
|
||||
options=options,
|
||||
text, voice_engine=self._settings["voice_engine"], options=options
|
||||
)
|
||||
|
||||
await self.start_tts_usage_metrics(text)
|
||||
|
||||
yield TTSStartedFrame()
|
||||
|
||||
b = bytearray()
|
||||
in_header = True
|
||||
async for chunk in playht_gen:
|
||||
# skip the RIFF header.
|
||||
if in_header:
|
||||
@@ -426,10 +410,11 @@ class PlayHTHttpTTSService(TTSService):
|
||||
fh.read(size)
|
||||
(data, size) = struct.unpack("<4sI", fh.read(8))
|
||||
in_header = False
|
||||
elif len(chunk) > 0:
|
||||
await self.stop_ttfb_metrics()
|
||||
frame = TTSAudioRawFrame(chunk, self.sample_rate, 1)
|
||||
yield frame
|
||||
else:
|
||||
if len(chunk):
|
||||
await self.stop_ttfb_metrics()
|
||||
frame = TTSAudioRawFrame(chunk, self.sample_rate, 1)
|
||||
yield frame
|
||||
except Exception as e:
|
||||
logger.error(f"{self} error generating TTS: {e}")
|
||||
finally:
|
||||
|
||||
@@ -407,10 +407,10 @@ class RimeHttpTTSService(TTSService):
|
||||
yield TTSStartedFrame()
|
||||
|
||||
# Process the streaming response
|
||||
CHUNK_SIZE = 1024
|
||||
chunk_size = 8192
|
||||
|
||||
async for chunk in response.content.iter_chunked(CHUNK_SIZE):
|
||||
if len(chunk) > 0:
|
||||
async for chunk in response.content.iter_chunked(chunk_size):
|
||||
if chunk:
|
||||
await self.stop_ttfb_metrics()
|
||||
frame = TTSAudioRawFrame(chunk, self.sample_rate, 1)
|
||||
yield frame
|
||||
|
||||
@@ -150,10 +150,8 @@ class XTTSService(TTSService):
|
||||
|
||||
yield TTSStartedFrame()
|
||||
|
||||
CHUNK_SIZE = 1024
|
||||
|
||||
buffer = bytearray()
|
||||
async for chunk in r.content.iter_chunked(CHUNK_SIZE):
|
||||
async for chunk in r.content.iter_chunked(1024):
|
||||
if len(chunk) > 0:
|
||||
await self.stop_ttfb_metrics()
|
||||
# Append new chunk to the buffer.
|
||||
|
||||
@@ -232,9 +232,6 @@ class BaseOutputTransport(FrameProcessor):
|
||||
await self.push_frame(BotStoppedSpeakingFrame())
|
||||
await self.push_frame(BotStoppedSpeakingFrame(), FrameDirection.UPSTREAM)
|
||||
self._bot_speaking = False
|
||||
# Clean audio buffer (there could be tiny left overs if not multiple
|
||||
# to our output chunk size).
|
||||
self._audio_buffer = bytearray()
|
||||
|
||||
#
|
||||
# Sink tasks
|
||||
|
||||
Reference in New Issue
Block a user