Compare commits

..

1 Commits

Author SHA1 Message Date
James Hush
230d92850a example: realtime with transcripts 2025-02-26 16:29:07 +08:00
16 changed files with 68 additions and 530 deletions

View File

@@ -5,13 +5,10 @@ All notable changes to **Pipecat** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [0.0.58] - 2025-02-26
## [Unreleased]
### Added
- Added track-specific audio event `on_track_audio_data` to
`AudioBufferProcessor` for accessing separate input and output audio tracks.
- Pipecat version will now be logged on every application startup. This will
help us identify what version we are running in case of any issues.
@@ -48,10 +45,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- ⚠️ `PipelineTask` now requires keyword arguments (except for the first one for
the pipeline).
- Updated `PlayHTHttpTTSService` to take a `voice_engine` and `protocol` input
in the constructor. The previous method of providing a `voice_engine` input
that contains the engine and protocol is deprecated by PlayHT.
- The base `TTSService` class now strips leading newlines before sending text
to the TTS provider. This change is to solve issues where some TTS providers,
like Azure, would not output text due to newlines.
@@ -85,9 +78,6 @@ stt = DeepgramSTTService(..., live_options=LiveOptions(model="nova-2-general"))
### Fixed
- Fixed a `GoogleLLMService` that was causing an exception when sending inline
audio in some cases.
- Fixed an `AudioContextWordTTSService` issue that would cause an `EndFrame` to
disconnect from the TTS service before audio from all the contexts was
received. This affected services like Cartesia and Rime.
@@ -134,9 +124,6 @@ stt = DeepgramSTTService(..., live_options=LiveOptions(model="nova-2-general"))
- Added Gemini support to `examples/phone-chatbot`.
- Added foundational example `34-audio-recording.py` showing how to use the
AudioBufferProcessor callbacks to save merged and track recordings.
## [0.0.57] - 2025-02-14
### Added

View File

@@ -1,103 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.elevenlabs import ElevenLabsHttpTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = ElevenLabsHttpTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY", ""),
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
aiohttp_session=session,
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.cancel()
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,103 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.openai import OpenAILLMService
from pipecat.services.rime import RimeHttpTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = RimeHttpTTSService(
api_key=os.getenv("RIME_API_KEY", ""),
voice_id="rex",
aiohttp_session=session,
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.cancel()
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -16,10 +16,13 @@ from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import TranscriptionMessage, TranscriptionUpdateFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.transcript_processor import TranscriptProcessor
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.services.openai_realtime_beta import (
InputAudioTranscription,
OpenAIRealtimeBetaLLMService,
@@ -140,14 +143,22 @@ Remember, your responses should be short. Just one or two sentences, usually."""
tools,
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
# Create transcript processor and handler
transcript = TranscriptProcessor()
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
transcript.user(), # User transcripts
context_aggregator.user(),
llm, # LLM
context_aggregator.assistant(),
transcript.assistant(), # Assistant transcripts
transport.output(), # Transport bot output
]
)
@@ -162,9 +173,16 @@ Remember, your responses should be short. Just one or two sentences, usually."""
),
)
# Register event handler for transcript updates
@transcript.event_handler("on_transcript_update")
async def on_transcript_update(processor, frame):
logger.debug(f"Received transcript update with {len(frame.messages)} new messages")
for msg in frame.messages:
logger.debug(msg)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
# await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
await task.queue_frames([context_aggregator.user().get_context_frame()])

View File

@@ -389,7 +389,7 @@ class AudioAccumulator(FrameProcessor):
)
self._user_speaking = False
context = GoogleLLMContext()
context.add_audio_frames_message(audio_frames=self._audio_frames)
context.add_audio_frames_message(text="Audio follows", audio_frames=self._audio_frames)
await self.push_frame(OpenAILLMContextFrame(context=context))
elif isinstance(frame, InputAudioRawFrame):
# Append the audio frame to our buffer. Treat the buffer as a ring buffer, dropping the oldest

View File

@@ -1,186 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Audio Recording Example with Pipecat.
This example demonstrates how to record audio from a conversation between a user and an AI assistant,
saving both merged and individual audio tracks. It showcases the AudioBufferProcessor's capabilities
to handle both combined and separate audio streams.
The example:
1. Sets up a basic conversation with an AI assistant
2. Records the entire conversation
3. Saves three separate WAV files:
- A merged recording of both participants
- Individual recording of user audio
- Individual recording of assistant audio
Example usage (run from pipecat root directory):
$ pip install "pipecat-ai[daily,openai,cartesia,silero]"
$ pip install -r dev-requirements.txt
$ python examples/foundational/34-audio-recording.py
Requirements:
- OpenAI API key (for GPT-4)
- Cartesia API key (for text-to-speech)
- Daily API key (for video/audio transport)
Environment variables (.env file):
OPENAI_API_KEY=your_openai_key
CARTESIA_API_KEY=your_cartesia_key
DAILY_API_KEY=your_daily_key
The recordings will be saved in a 'recordings' directory with timestamps:
recordings/
merged_20240315_123456.wav (Combined audio)
user_20240315_123456.wav (User audio only)
bot_20240315_123456.wav (Bot audio only)
Note:
This example requires the AudioBufferProcessor with track-specific audio support,
which provides both 'on_audio_data' and 'on_track_audio_data' events for
handling merged and separate audio tracks respectively.
"""
import asyncio
import datetime
import io
import os
import sys
import wave
import aiofiles
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def save_audio_file(audio: bytes, filename: str, sample_rate: int, num_channels: int):
"""Save audio data to a WAV file."""
if len(audio) > 0:
with io.BytesIO() as buffer:
with wave.open(buffer, "wb") as wf:
wf.setsampwidth(2)
wf.setnchannels(num_channels)
wf.setframerate(sample_rate)
wf.writeframes(audio)
async with aiofiles.open(filename, "wb") as file:
await file.write(buffer.getvalue())
logger.info(f"Audio saved to {filename}")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Recording bot",
DailyParams(
# audio_in_enabled=True,
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True, # Enable audio passthrough for recording
),
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22",
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4")
# Create audio buffer processor
audiobuffer = AudioBufferProcessor()
messages = [
{
"role": "system",
"content": "You are a helpful assistant demonstrating audio recording capabilities. Keep your responses brief and clear.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(),
context_aggregator.user(),
llm,
tts,
transport.output(),
audiobuffer, # Add audio buffer to pipeline
context_aggregator.assistant(),
]
)
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
await audiobuffer.start_recording()
messages.append(
{
"role": "system",
"content": "Greet the user and explain that this conversation will be recorded.",
}
)
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await audiobuffer.stop_recording()
await task.cancel()
# Handler for merged audio
@audiobuffer.event_handler("on_audio_data")
async def on_audio_data(buffer, audio, sample_rate, num_channels):
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"recordings/merged_{timestamp}.wav"
os.makedirs("recordings", exist_ok=True)
await save_audio_file(audio, filename, sample_rate, num_channels)
# Handler for separate tracks
@audiobuffer.event_handler("on_track_audio_data")
async def on_track_audio_data(buffer, user_audio, bot_audio, sample_rate, num_channels):
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
os.makedirs("recordings", exist_ok=True)
# Save user audio
user_filename = f"recordings/user_{timestamp}.wav"
await save_audio_file(user_audio, user_filename, sample_rate, 1)
# Save bot audio
bot_filename = f"recordings/bot_{timestamp}.wav"
await save_audio_file(bot_audio, bot_filename, sample_rate, 1)
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -7,15 +7,20 @@ import argparse
import asyncio
import os
import sys
from dataclasses import dataclass
from typing import Optional
import google.ai.generativelanguage as glm
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import (
BotStoppedSpeakingFrame,
EndTaskFrame,
Frame,
InputAudioRawFrame,
SystemFrame,
TranscriptionFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
@@ -23,11 +28,11 @@ from pipecat.frames.frames import (
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.ai_services import LLMService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.google import GoogleLLMService
from pipecat.services.google.google import GoogleLLMContext
from pipecat.services.google import GoogleLLMContext, GoogleLLMService
from pipecat.transports.services.daily import DailyDialinSettings, DailyParams, DailyTransport
load_dotenv(override=True)
@@ -235,7 +240,7 @@ If it sounds like a human (saying hello, asking questions, etc.), call the funct
DO NOT say anything until you've determined if this is a voicemail or human."""
llm = GoogleLLMService(
model="models/gemini-2.0-flash-lite",
model="models/gemini-2.0-flash-lite-preview-02-05",
api_key=os.getenv("GOOGLE_API_KEY"),
system_instruction=system_instruction,
tools=tools,

View File

@@ -20,14 +20,17 @@ classifiers = [
"Topic :: Scientific/Engineering :: Artificial Intelligence"
]
dependencies = [
"aiohttp~=3.11.13",
"aiohttp~=3.11.11",
"audioop-lts~=0.2.1; python_version>='3.13'",
# We need an older version of `httpx` that doesn't remove the deprecated
# `proxies` argument. This is necessary for Azure and Anthropic clients.
"httpx~=0.27.2",
"loguru~=0.7.3",
"Markdown~=3.7",
"numpy~=1.26.4",
"Pillow~=11.1.0",
"protobuf~=5.29.3",
"pydantic~=2.10.6",
"pydantic~=2.10.5",
"pyloudnorm~=0.1.1",
"resampy~=0.4.3",
"soxr~=0.5.0",
@@ -39,7 +42,7 @@ Source = "https://github.com/pipecat-ai/pipecat"
Website = "https://pipecat.ai"
[project.optional-dependencies]
anthropic = [ "anthropic~=0.47.2" ]
anthropic = [ "anthropic~=0.45.2" ]
assemblyai = [ "assemblyai~=0.36.0" ]
aws = [ "boto3~=1.35.99" ]
azure = [ "azure-cognitiveservices-speech~=1.42.0"]
@@ -47,13 +50,13 @@ canonical = [ "aiofiles~=24.1.0" ]
cartesia = [ "cartesia~=1.3.1", "websockets~=13.1" ]
cerebras = []
deepseek = []
daily = [ "daily-python~=0.15.0" ]
daily = [ "daily-python~=0.14.2" ]
deepgram = [ "deepgram-sdk~=3.8.0" ]
elevenlabs = [ "websockets~=13.1" ]
fal = [ "fal-client~=0.5.6" ]
fish = [ "ormsgpack~=1.7.0", "websockets~=13.1" ]
gladia = [ "websockets~=13.1" ]
google = [ "google-cloud-speech~=2.31.0", "google-cloud-texttospeech~=2.25.0", "google-genai~=1.3.0", "google-generativeai~=0.8.4" ]
google = [ "google-cloud-speech~=2.31.0", "google-cloud-texttospeech~=2.25.0", "google-genai~=1.2.0", "google-generativeai~=0.8.4" ]
grok = []
groq = []
gstreamer = [ "pygobject~=3.50.0" ]

View File

@@ -21,32 +21,20 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
class AudioBufferProcessor(FrameProcessor):
"""Processes and buffers audio frames from both input (user) and output (bot) sources.
"""This processor buffers audio raw frames (input and output). The mixed
audio can be obtained by registering an "on_audio_data" event handler.
The event handler will be called every time `buffer_size` is reached.
This processor manages audio buffering and synchronization, providing both merged and
track-specific audio access through event handlers. It supports various audio configurations
including sample rate conversion and mono/stereo output.
You can provide the desired output `sample_rate` and incoming audio frames
will resampled to match it. Also, you can provide the number of channels, 1
for mono and 2 for stereo. With mono audio user and bot audio will be mixed,
in the case of stereo the left channel will be used for the user's audio and
the right channel for the bot.
Events:
on_audio_data: Triggered when buffer_size is reached, providing merged audio
on_track_audio_data: Triggered when buffer_size is reached, providing separate tracks
Most of the time, user audio will be a continuous stream but it's possible
that in some cases only the spoken audio is sent. To accomodate for those
cases make sure to set `user_continuous_stream` accordingly.
Args:
sample_rate (Optional[int]): Desired output sample rate. If None, uses source rate
num_channels (int): Number of channels (1 for mono, 2 for stereo). Defaults to 1
buffer_size (int): Size of buffer before triggering events. 0 for no buffering
user_continuous_stream (bool): Whether user audio is continuous or speech-only
Audio handling:
- Mono output (num_channels=1): User and bot audio are mixed
- Stereo output (num_channels=2): User audio on left, bot audio on right
- Automatic resampling of incoming audio to match desired sample_rate
- Silence insertion for non-continuous audio streams
- Buffer synchronization between user and bot audio
Note:
When user_continuous_stream is False, the processor expects only speech
segments and will handle silence insertion between segments automatically.
"""
def __init__(
@@ -77,45 +65,21 @@ class AudioBufferProcessor(FrameProcessor):
self._resampler = create_default_resampler()
self._register_event_handler("on_audio_data")
self._register_event_handler("on_track_audio_data")
@property
def sample_rate(self) -> int:
"""Current sample rate of the audio processor.
Returns:
int: The sample rate in Hz
"""
return self._sample_rate
@property
def num_channels(self) -> int:
"""Number of channels in the audio output.
Returns:
int: Number of channels (1 for mono, 2 for stereo)
"""
return self._num_channels
def has_audio(self) -> bool:
"""Check if both user and bot audio buffers contain data.
Returns:
bool: True if both buffers contain audio data
"""
return self._buffer_has_audio(self._user_audio_buffer) and self._buffer_has_audio(
self._bot_audio_buffer
)
def merge_audio_buffers(self) -> bytes:
"""Merge user and bot audio buffers into a single audio stream.
For mono output, audio is mixed. For stereo output, user audio is placed
on the left channel and bot audio on the right channel.
Returns:
bytes: Mixed audio data
"""
if self._num_channels == 1:
return mix_audio(bytes(self._user_audio_buffer), bytes(self._bot_audio_buffer))
elif self._num_channels == 2:
@@ -126,23 +90,14 @@ class AudioBufferProcessor(FrameProcessor):
return b""
async def start_recording(self):
"""Start recording audio from both user and bot.
Initializes recording state and resets audio buffers.
"""
self._recording = True
self._reset_recording()
async def stop_recording(self):
"""Stop recording and trigger final audio data handlers.
Calls audio handlers with any remaining buffered audio before stopping.
"""
await self._call_on_audio_data_handler()
self._recording = False
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process incoming audio frames and manage audio buffers."""
await super().process_frame(frame, direction)
# Update output sample rate if necessary.
@@ -205,21 +160,10 @@ class AudioBufferProcessor(FrameProcessor):
if not self.has_audio() or not self._recording:
return
# Call original handler with merged audio
merged_audio = self.merge_audio_buffers()
await self._call_event_handler(
"on_audio_data", merged_audio, self._sample_rate, self._num_channels
)
# Call new handler with separate tracks
await self._call_event_handler(
"on_track_audio_data",
bytes(self._user_audio_buffer),
bytes(self._bot_audio_buffer),
self._sample_rate,
self._num_channels,
)
self._reset_audio_buffers()
def _buffer_has_audio(self, buffer: bytearray) -> bool:

View File

@@ -570,12 +570,10 @@ class ElevenLabsHttpTTSService(TTSService):
await self.start_tts_usage_metrics(text)
# Process the streaming response
CHUNK_SIZE = 1024
yield TTSStartedFrame()
async for chunk in response.content.iter_chunked(CHUNK_SIZE):
if len(chunk) > 0:
async for chunk in response.content:
if chunk:
await self.stop_ttfb_metrics()
yield TTSAudioRawFrame(chunk, self.sample_rate, 1)
except Exception as e:

View File

@@ -722,9 +722,7 @@ class GoogleLLMContext(OpenAILLMContext):
self.add_message(glm.Content(role="user", parts=parts))
def add_audio_frames_message(
self, *, audio_frames: list[AudioRawFrame], text: str = "Audio follows"
):
def add_audio_frames_message(self, *, audio_frames: list[AudioRawFrame], text: str = None):
if not audio_frames:
return
@@ -733,9 +731,8 @@ class GoogleLLMContext(OpenAILLMContext):
parts = []
data = b"".join(frame.audio for frame in audio_frames)
# NOTE(aleix): According to the docs only text or inline_data should be needed.
# (see https://cloud.google.com/vertex-ai/generative-ai/docs/model-reference/inference)
parts.append(glm.Part(text=text))
if text:
parts.append(glm.Part(text=text))
parts.append(
glm.Part(
inline_data=glm.Blob(

View File

@@ -530,10 +530,8 @@ class OpenAITTSService(TTSService):
await self.start_tts_usage_metrics(text)
CHUNK_SIZE = 1024
yield TTSStartedFrame()
async for chunk in r.iter_bytes(CHUNK_SIZE):
async for chunk in r.iter_bytes(8192):
if len(chunk) > 0:
await self.stop_ttfb_metrics()
frame = TTSAudioRawFrame(chunk, self.sample_rate, 1)

View File

@@ -323,8 +323,7 @@ class PlayHTHttpTTSService(TTSService):
api_key: str,
user_id: str,
voice_url: str,
voice_engine: str = "Play3.0-mini",
protocol: str = "http", # Options: http, ws
voice_engine: str = "Play3.0-mini-http", # Options: Play3.0-mini-http, Play3.0-mini-ws
sample_rate: Optional[int] = None,
params: InputParams = InputParams(),
**kwargs,
@@ -338,24 +337,12 @@ class PlayHTHttpTTSService(TTSService):
user_id=self._user_id,
api_key=self._api_key,
)
# Check if voice_engine contains protocol information (backward compatibility)
if "-http" in voice_engine:
# Extract the base engine name
voice_engine = voice_engine.replace("-http", "")
protocol = "http"
elif "-ws" in voice_engine:
# Extract the base engine name
voice_engine = voice_engine.replace("-ws", "")
protocol = "ws"
self._settings = {
"language": self.language_to_service_language(params.language)
if params.language
else "english",
"format": Format.FORMAT_WAV,
"voice_engine": voice_engine,
"protocol": protocol,
"speed": params.speed,
"seed": params.seed,
}
@@ -396,22 +383,19 @@ class PlayHTHttpTTSService(TTSService):
try:
options = self._create_options()
b = bytearray()
in_header = True
await self.start_ttfb_metrics()
playht_gen = self._client.tts(
text,
voice_engine=self._settings["voice_engine"],
protocol=self._settings["protocol"],
options=options,
text, voice_engine=self._settings["voice_engine"], options=options
)
await self.start_tts_usage_metrics(text)
yield TTSStartedFrame()
b = bytearray()
in_header = True
async for chunk in playht_gen:
# skip the RIFF header.
if in_header:
@@ -426,10 +410,11 @@ class PlayHTHttpTTSService(TTSService):
fh.read(size)
(data, size) = struct.unpack("<4sI", fh.read(8))
in_header = False
elif len(chunk) > 0:
await self.stop_ttfb_metrics()
frame = TTSAudioRawFrame(chunk, self.sample_rate, 1)
yield frame
else:
if len(chunk):
await self.stop_ttfb_metrics()
frame = TTSAudioRawFrame(chunk, self.sample_rate, 1)
yield frame
except Exception as e:
logger.error(f"{self} error generating TTS: {e}")
finally:

View File

@@ -407,10 +407,10 @@ class RimeHttpTTSService(TTSService):
yield TTSStartedFrame()
# Process the streaming response
CHUNK_SIZE = 1024
chunk_size = 8192
async for chunk in response.content.iter_chunked(CHUNK_SIZE):
if len(chunk) > 0:
async for chunk in response.content.iter_chunked(chunk_size):
if chunk:
await self.stop_ttfb_metrics()
frame = TTSAudioRawFrame(chunk, self.sample_rate, 1)
yield frame

View File

@@ -150,10 +150,8 @@ class XTTSService(TTSService):
yield TTSStartedFrame()
CHUNK_SIZE = 1024
buffer = bytearray()
async for chunk in r.content.iter_chunked(CHUNK_SIZE):
async for chunk in r.content.iter_chunked(1024):
if len(chunk) > 0:
await self.stop_ttfb_metrics()
# Append new chunk to the buffer.

View File

@@ -232,9 +232,6 @@ class BaseOutputTransport(FrameProcessor):
await self.push_frame(BotStoppedSpeakingFrame())
await self.push_frame(BotStoppedSpeakingFrame(), FrameDirection.UPSTREAM)
self._bot_speaking = False
# Clean audio buffer (there could be tiny left overs if not multiple
# to our output chunk size).
self._audio_buffer = bytearray()
#
# Sink tasks