Compare commits

..

1 Commits

Author SHA1 Message Date
James Hush
230d92850a example: realtime with transcripts 2025-02-26 16:29:07 +08:00
85 changed files with 433 additions and 3356 deletions

View File

@@ -1,8 +1,7 @@
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.9.7
- repo: local
hooks:
- id: ruff
language_version: python3
args: [ --select, I, ]
- id: ruff-format
- id: ruff-format-hook
name: Check ruff formatting
entry: sh scripts/pre-commit.sh
language: system

View File

@@ -9,65 +9,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Added new base class `BaseObject` which is now the base class of
`FrameProcessor`, `PipelineRunner`, `PipelineTask` and `BaseTransport`. The
new `BaseObject` adds supports for event handlers.
- Added support for a unified format for specifying function calling across all
LLM services.
```python
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location"],
)
tools = ToolsSchema(standard_tools=[weather_function])
```
- Added `speech_threshold` parameter to `GladiaSTTService`.
- Allow passing user (`user_kwargs`) and assistant (`assistant_kwargs`) context
aggregator parameters when using `create_context_aggregator()`. The values are
passed as a mapping that will then be converted to arguments.
- Added `speed` as an `InputParam` for both `ElevenLabsTTSService` and
`ElevenLabsHttpTTSService`.
- Added new `LLMFullResponseAggregator` to aggregate full LLM completions. At
every completion the `on_completion` event handler is triggered.
- Added a new frame, `RTVIServerMessageFrame`, and RTVI message
`RTVIServerMessage` which provides a generic mechanism for sending custom
messages from server to client. The `RTVIServerMessageFrame` is processed by
the `RTVIObserver` and will be delivered to the client's `onServerMessage`
callback or `ServerMessage` event.
- Added `GoogleLLMOpenAIBetaService` for Google LLM integration with an
OpenAI-compatible interface. Added foundational example
`14o-function-calling-gemini-openai-format.py`.
- Added `AzureRealtimeBetaLLMService` to support Azure's OpeanAI Realtime API. Added
foundational example `19a-azure-realtime-beta.py`.
## [0.0.58] - 2025-02-26
### Added
- Added track-specific audio event `on_track_audio_data` to
`AudioBufferProcessor` for accessing separate input and output audio tracks.
- Pipecat version will now be logged on every application startup. This will
help us identify what version we are running in case of any issues.
@@ -104,10 +45,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- ⚠️ `PipelineTask` now requires keyword arguments (except for the first one for
the pipeline).
- Updated `PlayHTHttpTTSService` to take a `voice_engine` and `protocol` input
in the constructor. The previous method of providing a `voice_engine` input
that contains the engine and protocol is deprecated by PlayHT.
- The base `TTSService` class now strips leading newlines before sending text
to the TTS provider. This change is to solve issues where some TTS providers,
like Azure, would not output text due to newlines.
@@ -141,9 +78,6 @@ stt = DeepgramSTTService(..., live_options=LiveOptions(model="nova-2-general"))
### Fixed
- Fixed a `GoogleLLMService` that was causing an exception when sending inline
audio in some cases.
- Fixed an `AudioContextWordTTSService` issue that would cause an `EndFrame` to
disconnect from the TTS service before audio from all the contexts was
received. This affected services like Cartesia and Rime.
@@ -190,9 +124,6 @@ stt = DeepgramSTTService(..., live_options=LiveOptions(model="nova-2-general"))
- Added Gemini support to `examples/phone-chatbot`.
- Added foundational example `34-audio-recording.py` showing how to use the
AudioBufferProcessor callbacks to save merged and track recordings.
## [0.0.57] - 2025-02-14
### Added

View File

@@ -113,8 +113,8 @@ async def main():
llm,
tts,
transport.output(),
canonical, # uploads audio buffer to Canonical AI for metrics
audio_buffer_processor, # captures audio into a buffer
canonical, # uploads audio buffer to Canonical AI for metrics
context_aggregator.assistant(),
]
)

View File

@@ -1,103 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.elevenlabs import ElevenLabsHttpTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = ElevenLabsHttpTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY", ""),
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
aiohttp_session=session,
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.cancel()
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,103 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.openai import OpenAILLMService
from pipecat.services.rime import RimeHttpTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = RimeHttpTTSService(
api_key=os.getenv("RIME_API_KEY", ""),
voice_id="rex",
aiohttp_session=session,
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.cancel()
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,137 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from openai.types.chat import ChatCompletionToolParam
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.google import GoogleLLMOpenAIBetaService
from pipecat.services.openai import OpenAILLMContext
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def start_fetch_weather(function_name, llm, context):
"""Push a frame to the LLM; this is handy when the LLM response might take a while."""
await llm.push_frame(TTSSpeakFrame("Let me check on that."))
logger.debug(f"Starting fetch_weather_from_api with function_name: {function_name}")
async def fetch_weather_from_api(function_name, tool_call_id, args, llm, context, result_callback):
await result_callback({"conditions": "nice", "temperature": "75"})
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = ElevenLabsTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY", ""),
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
)
llm = GoogleLLMOpenAIBetaService(api_key=os.getenv("GEMINI_API_KEY"))
# Register a function_name of None to get all functions
# sent to the same callback with an additional function_name parameter.
llm.register_function(
"get_current_weather", fetch_weather_from_api, start_callback=start_fetch_weather
)
tools = [
ChatCompletionToolParam(
type="function",
function={
"name": "get_current_weather",
"description": "Get the current weather",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
"required": ["location", "format"],
},
},
)
]
messages = [
{
"role": "user",
"content": "Start a conversation with 'Hey there' to get the current weather.",
},
]
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(),
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
await task.queue_frames([context_aggregator.user().get_context_frame()])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -16,10 +16,13 @@ from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import TranscriptionMessage, TranscriptionUpdateFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.transcript_processor import TranscriptProcessor
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.services.openai_realtime_beta import (
InputAudioTranscription,
OpenAIRealtimeBetaLLMService,
@@ -140,14 +143,22 @@ Remember, your responses should be short. Just one or two sentences, usually."""
tools,
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
# Create transcript processor and handler
transcript = TranscriptProcessor()
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
transcript.user(), # User transcripts
context_aggregator.user(),
llm, # LLM
context_aggregator.assistant(),
transcript.assistant(), # Assistant transcripts
transport.output(), # Transport bot output
]
)
@@ -162,9 +173,16 @@ Remember, your responses should be short. Just one or two sentences, usually."""
),
)
# Register event handler for transcript updates
@transcript.event_handler("on_transcript_update")
async def on_transcript_update(processor, frame):
logger.debug(f"Received transcript update with {len(frame.messages)} new messages")
for msg in frame.messages:
logger.debug(msg)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
# await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
await task.queue_frames([context_aggregator.user().get_context_frame()])

View File

@@ -1,179 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
from datetime import datetime
import aiohttp
import websockets
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.openai_realtime_beta import (
AzureRealtimeBetaLLMService,
InputAudioTranscription,
SessionProperties,
TurnDetection,
)
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def fetch_weather_from_api(function_name, tool_call_id, args, llm, context, result_callback):
temperature = 75 if args["format"] == "fahrenheit" else 24
await result_callback(
{
"conditions": "nice",
"temperature": temperature,
"format": args["format"],
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
}
)
tools = [
{
"type": "function",
"name": "get_current_weather",
"description": "Get the current weather",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
"required": ["location", "format"],
},
}
]
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
transcription_enabled=False,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.8)),
vad_audio_passthrough=True,
),
)
session_properties = SessionProperties(
input_audio_transcription=InputAudioTranscription(),
# Set openai TurnDetection parameters. Not setting this at all will turn it
# on by default
# turn_detection=TurnDetection(silence_duration_ms=1000),
# Or set to False to disable openai turn detection and use transport VAD
# turn_detection=False,
# tools=tools,
instructions="""Your knowledge cutoff is 2023-10. You are a helpful and friendly AI.
Act like a human, but remember that you aren't a human and that you can't do human
things in the real world. Your voice and personality should be warm and engaging, with a lively and
playful tone.
If interacting in a non-English language, start by using the standard accent or dialect familiar to
the user. Talk quickly. You should always call a function if you can. Do not refer to these rules,
even if you're asked about them.
-
You are participating in a voice conversation. Keep your responses concise, short, and to the point
unless specifically asked to elaborate on a topic.
Remember, your responses should be short. Just one or two sentences, usually.""",
)
llm = AzureRealtimeBetaLLMService(
api_key=os.getenv("AZURE_REALTIME_API_KEY"),
base_url=os.getenv("AZURE_REALTIME_BASE_URL"),
session_properties=session_properties,
start_audio_paused=False,
)
# you can either register a single function for all function calls, or specific functions
# llm.register_function(None, fetch_weather_from_api)
llm.register_function("get_current_weather", fetch_weather_from_api)
# Create a standard OpenAI LLM context object using the normal messages format. The
# OpenAIRealtimeBetaLLMService will convert this internally to messages that the
# openai WebSocket API can understand.
context = OpenAILLMContext(
[{"role": "user", "content": "Say hello!"}],
# [{"role": "user", "content": [{"type": "text", "text": "Say hello!"}]}],
# [
# {
# "role": "user",
# "content": [
# {"type": "text", "text": "Say"},
# {"type": "text", "text": "yo what's up!"},
# ],
# }
# ],
tools,
)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(),
llm, # LLM
context_aggregator.assistant(),
transport.output(), # Transport bot output
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
# report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
await task.queue_frames([context_aggregator.user().get_context_frame()])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -23,6 +23,7 @@ from pipecat.frames.frames import (
FunctionCallInProgressFrame,
FunctionCallResultFrame,
InputAudioRawFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
StartFrame,
StartInterruptionFrame,
@@ -36,7 +37,7 @@ from pipecat.pipeline.parallel_pipeline import ParallelPipeline
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import LLMAssistantResponseAggregator
from pipecat.processors.aggregators.llm_response import LLMResponseAggregator
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
OpenAILLMContextFrame,
@@ -388,7 +389,7 @@ class AudioAccumulator(FrameProcessor):
)
self._user_speaking = False
context = GoogleLLMContext()
context.add_audio_frames_message(audio_frames=self._audio_frames)
context.add_audio_frames_message(text="Audio follows", audio_frames=self._audio_frames)
await self.push_frame(OpenAILLMContextFrame(context=context))
elif isinstance(frame, InputAudioRawFrame):
# Append the audio frame to our buffer. Treat the buffer as a ring buffer, dropping the oldest
@@ -431,11 +432,7 @@ class CompletenessCheck(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, (EndFrame, CancelFrame)):
if self._idle_task:
await self.cancel_task(self._idle_task)
self._idle_task = None
elif isinstance(frame, UserStartedSpeakingFrame):
if isinstance(frame, UserStartedSpeakingFrame):
if self._idle_task:
await self.cancel_task(self._idle_task)
elif isinstance(frame, TextFrame) and frame.text.startswith("YES"):
@@ -477,11 +474,19 @@ class CompletenessCheck(FrameProcessor):
self._idle_task = None
class LLMAggregatorBuffer(LLMAssistantResponseAggregator):
class UserAggregatorBuffer(LLMResponseAggregator):
"""Buffers the output of the transcription LLM. Used by the bot output gate."""
def __init__(self, **kwargs):
super().__init__(expect_stripped_words=False)
super().__init__(
messages=None,
role=None,
start_frame=LLMFullResponseStartFrame,
end_frame=LLMFullResponseEndFrame,
accumulator_frame=TextFrame,
handle_interruptions=True,
expect_stripped_words=False,
)
self._transcription = ""
async def process_frame(self, frame: Frame, direction: FrameDirection):
@@ -539,7 +544,7 @@ class OutputGate(FrameProcessor):
self,
notifier: BaseNotifier,
context: OpenAILLMContext,
llm_transcription_buffer: LLMAggregatorBuffer,
user_transcription_buffer: "UserAggregatorBuffer",
**kwargs,
):
super().__init__(**kwargs)
@@ -547,7 +552,7 @@ class OutputGate(FrameProcessor):
self._frames_buffer = []
self._notifier = notifier
self._context = context
self._transcription_buffer = llm_transcription_buffer
self._transcription_buffer = user_transcription_buffer
self._gate_task = None
def close_gate(self):
@@ -694,10 +699,10 @@ async def main():
conversation_audio_context_assembler = ConversationAudioContextAssembler(context=context)
llm_aggregator_buffer = LLMAggregatorBuffer()
user_aggregator_buffer = UserAggregatorBuffer()
bot_output_gate = OutputGate(
notifier=notifier, context=context, llm_transcription_buffer=llm_aggregator_buffer
notifier=notifier, context=context, user_transcription_buffer=user_aggregator_buffer
)
pipeline = Pipeline(
@@ -718,7 +723,7 @@ async def main():
],
[
tx_llm,
llm_aggregator_buffer,
user_aggregator_buffer,
],
)
],

View File

@@ -1,186 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Audio Recording Example with Pipecat.
This example demonstrates how to record audio from a conversation between a user and an AI assistant,
saving both merged and individual audio tracks. It showcases the AudioBufferProcessor's capabilities
to handle both combined and separate audio streams.
The example:
1. Sets up a basic conversation with an AI assistant
2. Records the entire conversation
3. Saves three separate WAV files:
- A merged recording of both participants
- Individual recording of user audio
- Individual recording of assistant audio
Example usage (run from pipecat root directory):
$ pip install "pipecat-ai[daily,openai,cartesia,silero]"
$ pip install -r dev-requirements.txt
$ python examples/foundational/34-audio-recording.py
Requirements:
- OpenAI API key (for GPT-4)
- Cartesia API key (for text-to-speech)
- Daily API key (for video/audio transport)
Environment variables (.env file):
OPENAI_API_KEY=your_openai_key
CARTESIA_API_KEY=your_cartesia_key
DAILY_API_KEY=your_daily_key
The recordings will be saved in a 'recordings' directory with timestamps:
recordings/
merged_20240315_123456.wav (Combined audio)
user_20240315_123456.wav (User audio only)
bot_20240315_123456.wav (Bot audio only)
Note:
This example requires the AudioBufferProcessor with track-specific audio support,
which provides both 'on_audio_data' and 'on_track_audio_data' events for
handling merged and separate audio tracks respectively.
"""
import asyncio
import datetime
import io
import os
import sys
import wave
import aiofiles
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def save_audio_file(audio: bytes, filename: str, sample_rate: int, num_channels: int):
"""Save audio data to a WAV file."""
if len(audio) > 0:
with io.BytesIO() as buffer:
with wave.open(buffer, "wb") as wf:
wf.setsampwidth(2)
wf.setnchannels(num_channels)
wf.setframerate(sample_rate)
wf.writeframes(audio)
async with aiofiles.open(filename, "wb") as file:
await file.write(buffer.getvalue())
logger.info(f"Audio saved to {filename}")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Recording bot",
DailyParams(
# audio_in_enabled=True,
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True, # Enable audio passthrough for recording
),
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22",
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4")
# Create audio buffer processor
audiobuffer = AudioBufferProcessor()
messages = [
{
"role": "system",
"content": "You are a helpful assistant demonstrating audio recording capabilities. Keep your responses brief and clear.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(),
context_aggregator.user(),
llm,
tts,
transport.output(),
audiobuffer, # Add audio buffer to pipeline
context_aggregator.assistant(),
]
)
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
await audiobuffer.start_recording()
messages.append(
{
"role": "system",
"content": "Greet the user and explain that this conversation will be recorded.",
}
)
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await audiobuffer.stop_recording()
await task.cancel()
# Handler for merged audio
@audiobuffer.event_handler("on_audio_data")
async def on_audio_data(buffer, audio, sample_rate, num_channels):
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"recordings/merged_{timestamp}.wav"
os.makedirs("recordings", exist_ok=True)
await save_audio_file(audio, filename, sample_rate, num_channels)
# Handler for separate tracks
@audiobuffer.event_handler("on_track_audio_data")
async def on_track_audio_data(buffer, user_audio, bot_audio, sample_rate, num_channels):
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
os.makedirs("recordings", exist_ok=True)
# Save user audio
user_filename = f"recordings/user_{timestamp}.wav"
await save_audio_file(user_audio, user_filename, sample_rate, 1)
# Save bot audio
bot_filename = f"recordings/bot_{timestamp}.wav"
await save_audio_file(bot_audio, bot_filename, sample_rate, 1)
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -7,17 +7,20 @@ import argparse
import asyncio
import os
import sys
from dataclasses import dataclass
from typing import Optional
import google.ai.generativelanguage as glm
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import (
EndFrame,
BotStoppedSpeakingFrame,
EndTaskFrame,
Frame,
InputAudioRawFrame,
StopTaskFrame,
SystemFrame,
TranscriptionFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
@@ -25,17 +28,12 @@ from pipecat.frames.frames import (
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.ai_services import LLMService
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.google import GoogleLLMService
from pipecat.services.google.google import GoogleLLMContext
from pipecat.transports.services.daily import (
DailyDialinSettings,
DailyParams,
DailyTransport,
)
from pipecat.services.google import GoogleLLMContext, GoogleLLMService
from pipecat.transports.services.daily import DailyDialinSettings, DailyParams, DailyTransport
load_dotenv(override=True)
@@ -46,8 +44,6 @@ logger.add(sys.stderr, level="DEBUG")
daily_api_key = os.getenv("DAILY_API_KEY", "")
daily_api_url = os.getenv("DAILY_API_URL", "https://api.daily.co/v1")
system_message = None
class UserAudioCollector(FrameProcessor):
"""This FrameProcessor collects audio frames in a buffer, then adds them to the
@@ -121,13 +117,7 @@ class FunctionHandlers:
self.context_switcher = context_switcher
async def voicemail_response(
self,
function_name,
tool_call_id,
args,
llm: LLMService,
context,
result_callback,
self, function_name, tool_call_id, args, llm, context, result_callback
):
"""Function the bot can call to leave a voicemail message."""
message = """You are Chatbot leaving a voicemail message. Say EXACTLY this message and nothing else:
@@ -137,48 +127,62 @@ class FunctionHandlers:
After saying this message, call the terminate_call function."""
await self.context_switcher.switch_context(system_instruction=message)
await result_callback("Leaving a voicemail message")
async def human_conversation(
self,
function_name,
tool_call_id,
args,
llm: LLMService,
context,
result_callback,
self, function_name, tool_call_id, args, llm, context, result_callback
):
"""Function the bot can when it detects it's talking to a human."""
await llm.push_frame(StopTaskFrame(), FrameDirection.UPSTREAM)
message = """You are Chatbot talking to a human. Be friendly and helpful.
Start with: "Hello! I'm a friendly chatbot. How can I help you today?"
Keep your responses brief and to the point. Listen to what the person says.
When the person indicates they're done with the conversation by saying something like:
- "Goodbye"
- "That's all"
- "I'm done"
- "Thank you, that's all I needed"
THEN say: "Thank you for chatting. Goodbye!" and call the terminate_call function."""
await self.context_switcher.switch_context(system_instruction=message)
await result_callback("Talking to the customer")
async def terminate_call(
function_name,
tool_call_id,
args,
llm: LLMService,
context,
result_callback,
call_state=None,
function_name, tool_call_id, args, llm: LLMService, context, result_callback
):
"""Function the bot can call to terminate the call upon completion of the call."""
if call_state:
call_state.bot_terminated_call = True
await llm.push_frame(EndTaskFrame(), FrameDirection.UPSTREAM)
await llm.queue_frame(EndTaskFrame(), FrameDirection.UPSTREAM)
async def main(
room_url: str,
token: str,
callId: Optional[str],
callDomain: Optional[str],
callId: str,
callDomain: str,
detect_voicemail: bool,
dialout_number: Optional[str],
):
# dialin_settings are only needed if Daily's SIP URI is used
# If you are handling this via Twilio, Telnyx, set this to None
# and handle call-forwarding when on_dialin_ready fires.
# We don't want to specify dial-in settings if we're not dialing in
dialin_settings = None
if callId and callDomain:
dialin_settings = DailyDialinSettings(call_id=callId, call_domain=callDomain)
transport_params = DailyParams(
transport = DailyTransport(
room_url,
token,
"Chatbot",
DailyParams(
api_url=daily_api_url,
api_key=daily_api_key,
dialin_settings=dialin_settings,
@@ -188,30 +192,8 @@ async def main(
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
)
else:
transport_params = DailyParams(
api_url=daily_api_url,
api_key=daily_api_key,
audio_in_enabled=True,
audio_out_enabled=True,
camera_out_enabled=False,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
)
class CallState:
participant_left_early = False
bot_terminated_call = False
call_state = CallState()
transport = DailyTransport(
room_url,
token,
"Chatbot",
transport_params,
# transcription_enabled=True,
),
)
tts = ElevenLabsTTSService(
@@ -219,10 +201,6 @@ async def main(
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
### VOICEMAIL PIPELINE
tools = [
{
"function_declarations": [
@@ -244,67 +222,55 @@ async def main(
system_instruction = """You are Chatbot trying to determine if this is a voicemail system or a human.
If you hear any of these phrases (or very similar ones):
- "Please leave a message after the beep"
- "No one is available to take your call"
- "Record your message after the tone"
- "You have reached voicemail for..."
- "You have reached [phone number]"
- "[phone number] is unavailable"
- "The person you are trying to reach..."
- "The number you have dialed..."
- "Your call has been forwarded to an automated voice messaging system"
If you hear any of these phrases (or very similar ones):
- "Please leave a message after the beep"
- "No one is available to take your call"
- "Record your message after the tone"
- "You have reached voicemail for..."
- "You have reached [phone number]"
- "[phone number] is unavailable"
- "The person you are trying to reach..."
- "The number you have dialed..."
- "Your call has been forwarded to an automated voice messaging system"
Then call the function switch_to_voicemail_response.
Then call the function switch_to_voicemail_response.
If it sounds like a human (saying hello, asking questions, etc.), call the function switch_to_human_conversation.
If it sounds like a human (saying hello, asking questions, etc.), call the function switch_to_human_conversation.
DO NOT say anything until you've determined if this is a voicemail or human."""
DO NOT say anything until you've determined if this is a voicemail or human."""
voicemail_detection_llm = GoogleLLMService(
model="models/gemini-2.0-flash-lite",
llm = GoogleLLMService(
model="models/gemini-2.0-flash-lite-preview-02-05",
api_key=os.getenv("GOOGLE_API_KEY"),
system_instruction=system_instruction,
tools=tools,
)
voicemail_detection_context = GoogleLLMContext()
voicemail_detection_context_aggregator = voicemail_detection_llm.create_context_aggregator(
voicemail_detection_context
)
context_switcher = ContextSwitcher(
voicemail_detection_llm, voicemail_detection_context_aggregator.user()
)
context = GoogleLLMContext()
context_aggregator = llm.create_context_aggregator(context)
audio_collector = UserAudioCollector(context, context_aggregator.user())
context_switcher = ContextSwitcher(llm, context_aggregator.user())
handlers = FunctionHandlers(context_switcher)
voicemail_detection_llm.register_function(
"switch_to_voicemail_response", handlers.voicemail_response
)
voicemail_detection_llm.register_function(
"switch_to_human_conversation", handlers.human_conversation
)
voicemail_detection_llm.register_function(
"terminate_call",
lambda *args, **kwargs: terminate_call(*args, **kwargs, call_state=call_state),
)
llm.register_function("switch_to_voicemail_response", handlers.voicemail_response)
llm.register_function("switch_to_human_conversation", handlers.human_conversation)
llm.register_function("terminate_call", terminate_call)
voicemail_detection_audio_collector = UserAudioCollector(
voicemail_detection_context, voicemail_detection_context_aggregator.user()
)
voicemail_detection_pipeline = Pipeline(
pipeline = Pipeline(
[
transport.input(), # Transport user input
voicemail_detection_audio_collector, # Collect audio frames
voicemail_detection_context_aggregator.user(), # User responses
voicemail_detection_llm, # LLM
audio_collector, # Collect audio frames
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
voicemail_detection_context_aggregator.assistant(), # Assistant spoken responses
context_aggregator.assistant(), # Assistant spoken responses
]
)
voicemail_detection_pipeline_task = PipelineTask(
voicemail_detection_pipeline,
task = PipelineTask(
pipeline,
params=PipelineParams(allow_interruptions=True),
)
@@ -339,116 +305,25 @@ async def main(
# machine to say something like 'Leave a message after the beep', or for the user to say 'Hello?'.
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
logger.debug("Detect voicemail; capturing participant transcription")
await transport.capture_participant_transcription(participant["id"])
else:
logger.debug("+++++ No dialout number; assuming dialin")
logger.debug("no dialout number; assuming dialin")
# Different handlers for dialin
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
# This event is not firing for some reason
await transport.capture_participant_transcription(participant["id"])
dialin_instructions = """Always call the function switch_to_human_conversation"""
messages = [
{
"role": "system",
"content": dialin_instructions,
}
]
voicemail_detection_context_aggregator.user().set_messages(messages)
await voicemail_detection_pipeline_task.queue_frames(
[voicemail_detection_context_aggregator.user().get_context_frame()]
)
# For the dialin case, we want the bot to answer the phone and greet the user. We
# can prompt the bot to speak by putting the context into the pipeline.
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.cancel()
runner = PipelineRunner()
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
call_state.participant_left_early = True
await voicemail_detection_pipeline_task.queue_frame(EndFrame())
print("!!! starting voicemail detection pipeline")
await runner.run(voicemail_detection_pipeline_task)
print("!!! Done with voicemail detection pipeline")
if call_state.participant_left_early or call_state.bot_terminated_call:
if call_state.participant_left_early:
print("!!! Participant left early; terminating call")
elif call_state.bot_terminated_call:
print("!!! Bot terminated call; not proceeding to human conversation")
return
### HUMAN CONVERSATION PIPELINE
human_conversation_system_instruction = """You are Chatbot talking to a human. Be friendly and helpful.
Start with: "Hello! I'm a friendly chatbot. How can I help you today?"
Keep your responses brief and to the point. Listen to what the person says.
When the person indicates they're done with the conversation by saying something like:
- "Goodbye"
- "That's all"
- "I'm done"
- "Thank you, that's all I needed"
THEN say: "Thank you for chatting. Goodbye!" and call the terminate_call function."""
human_conversation_llm = GoogleLLMService(
model="models/gemini-2.0-flash-001",
api_key=os.getenv("GOOGLE_API_KEY"),
system_instruction=human_conversation_system_instruction,
tools=tools,
)
human_conversation_context = GoogleLLMContext()
human_conversation_context_aggregator = human_conversation_llm.create_context_aggregator(
human_conversation_context
)
human_conversation_llm.register_function(
"terminate_call",
lambda *args, **kwargs: terminate_call(*args, **kwargs, call_state=call_state),
)
human_conversation_pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
human_conversation_context_aggregator.user(), # User responses
human_conversation_llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
human_conversation_context_aggregator.assistant(), # Assistant spoken responses
]
)
human_conversation_pipeline_task = PipelineTask(
human_conversation_pipeline,
params=PipelineParams(allow_interruptions=True),
)
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await voicemail_detection_pipeline_task.queue_frame(EndFrame())
await human_conversation_pipeline_task.queue_frame(EndFrame())
print("!!! starting human conversation pipeline")
human_conversation_context_aggregator.user().set_messages(
[
{
"role": "system",
"content": human_conversation_system_instruction,
}
]
)
await human_conversation_pipeline_task.queue_frames(
[human_conversation_context_aggregator.user().get_context_frame()]
)
await runner.run(human_conversation_pipeline_task)
print("!!! Done with human conversation pipeline")
await runner.run(task)
if __name__ == "__main__":

View File

@@ -1,134 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.ai_services import LLMService
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
load_dotenv(override=True)
async def start_fetch_weather(function_name, llm, context):
"""Push a frame to the LLM; this is handy when the LLM response might take a while."""
await llm.push_frame(TTSSpeakFrame("Let me check on that."))
logger.debug(f"Starting fetch_weather_from_api with function_name: {function_name}")
async def fetch_weather_from_api(function_name, tool_call_id, args, llm, context, result_callback):
await result_callback({"conditions": "nice", "temperature": "75"})
class WeatherBot:
"""Generic base class for setting up and running an LLM-powered bot."""
def __init__(self, llm: LLMService):
"""Initialize the base handler with a specific LLM."""
self.llm = llm
async def run(self):
"""Set up and start the processing pipeline."""
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
# Register a function_name of None to get all functions
# sent to the same callback with an additional function_name parameter.
self.llm.register_function(
None, fetch_weather_from_api, start_callback=start_fetch_weather
)
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location"],
)
tools = ToolsSchema(standard_tools=[weather_function])
messages = [
{
"role": "system",
"content": "You are a helpful assistant who can report the weather in any location in the universe. Respond concisely. Your response will be turned into speech so use only simple words and punctuation.",
},
{"role": "user", "content": " Start the conversation by introducing yourself."},
]
context = OpenAILLMContext(messages, tools)
context_aggregator = self.llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(),
context_aggregator.user(),
self.llm,
tts,
transport.output(),
context_aggregator.assistant(),
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
await task.queue_frames([context_aggregator.user().get_context_frame()])
runner = PipelineRunner()
await runner.run(task)

View File

@@ -1,126 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import sys
from typing import List
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.ai_services import LLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
load_dotenv(override=True)
async def start_fetch_weather(function_name, llm, context):
"""Push a frame to the LLM; this is handy when the LLM response might take a while."""
await llm.push_frame(TTSSpeakFrame("Let me check on that."))
logger.debug(f"Starting fetch_weather_from_api with function_name: {function_name}")
async def fetch_weather_from_api(function_name, tool_call_id, args, llm, context, result_callback):
await result_callback({"conditions": "nice", "temperature": "75"})
class MultimodalWeatherBot:
"""Generic base class for setting up and running an LLM-powered bot."""
def __init__(self, llm: LLMService):
"""Initialize the base handler with a specific LLM."""
self.llm = llm
@staticmethod
def tools() -> ToolsSchema:
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location"],
)
return ToolsSchema(standard_tools=[weather_function])
async def run(self):
"""Set up and start the processing pipeline."""
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
),
)
# Register a function_name of None to get all functions
# sent to the same callback with an additional function_name parameter.
self.llm.register_function(
None, fetch_weather_from_api, start_callback=start_fetch_weather
)
messages = [
{
"role": "system",
"content": "You are a helpful assistant who can report the weather in any location in the universe. Respond concisely. Your response will be turned into speech so use only simple words and punctuation.",
},
{"role": "user", "content": " Start the conversation by introducing yourself."},
]
context = OpenAILLMContext(messages, MultimodalWeatherBot.tools())
context_aggregator = self.llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(),
context_aggregator.user(),
self.llm,
transport.output(),
context_aggregator.assistant(),
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
await task.queue_frames([context_aggregator.user().get_context_frame()])
runner = PipelineRunner()
await runner.run(task)

View File

@@ -1,64 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import os
from typing import Optional
import aiohttp
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper
async def configure(aiohttp_session: aiohttp.ClientSession):
(url, token, _) = await configure_with_args(aiohttp_session)
return (url, token)
async def configure_with_args(
aiohttp_session: aiohttp.ClientSession, parser: Optional[argparse.ArgumentParser] = None
):
if not parser:
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
parser.add_argument(
"-u", "--url", type=str, required=False, help="URL of the Daily room to join"
)
parser.add_argument(
"-k",
"--apikey",
type=str,
required=False,
help="Daily API Key (needed to create an owner token for the room)",
)
args, unknown = parser.parse_known_args()
url = args.url or os.getenv("DAILY_SAMPLE_ROOM_URL")
key = args.apikey or os.getenv("DAILY_API_KEY")
if not url:
raise Exception(
"No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL."
)
if not key:
raise Exception(
"No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers."
)
daily_rest_helper = DailyRESTHelper(
daily_api_key=key,
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session,
)
# Create a meeting token for the given room with an expiration 1 hour in
# the future.
expiry_time: float = 60 * 60
token = await daily_rest_helper.get_token(url, expiry_time)
return (url, token, args)

View File

@@ -1,29 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
from base_function_calling import WeatherBot
from dotenv import load_dotenv
from pipecat.services.anthropic import AnthropicLLMService
load_dotenv(override=True)
class AnthropicWeatherBot(WeatherBot):
"""Main class defining the LLM and passing it to the base handler."""
def __init__(self):
llm = AnthropicLLMService(
api_key=os.getenv("ANTHROPIC_API_KEY"), model="claude-3-5-sonnet-20240620"
)
super().__init__(llm)
if __name__ == "__main__":
asyncio.run(AnthropicWeatherBot().run())

View File

@@ -1,31 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
from base_function_calling import WeatherBot
from dotenv import load_dotenv
from pipecat.services.azure import AzureLLMService
load_dotenv(override=True)
class AzureWeatherBot(WeatherBot):
"""Main class defining the LLM and passing it to the base handler."""
def __init__(self):
llm = AzureLLMService(
api_key=os.getenv("AZURE_CHATGPT_API_KEY"),
endpoint=os.getenv("AZURE_CHATGPT_ENDPOINT"),
model=os.getenv("AZURE_CHATGPT_MODEL"),
)
super().__init__(llm)
if __name__ == "__main__":
asyncio.run(AzureWeatherBot().run())

View File

@@ -1,27 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
from base_function_calling import WeatherBot
from dotenv import load_dotenv
from pipecat.services.cerebras import CerebrasLLMService
load_dotenv(override=True)
class CerebrasWeatherBot(WeatherBot):
"""Main class defining the LLM and passing it to the base handler."""
def __init__(self):
llm = CerebrasLLMService(api_key=os.getenv("CEREBRAS_API_KEY"), model="llama-3.3-70b")
super().__init__(llm)
if __name__ == "__main__":
asyncio.run(CerebrasWeatherBot().run())

View File

@@ -1,27 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
from base_function_calling import WeatherBot
from dotenv import load_dotenv
from pipecat.services.deepseek import DeepSeekLLMService
load_dotenv(override=True)
class DeepSeekWeatherBot(WeatherBot):
"""Main class defining the LLM and passing it to the base handler."""
def __init__(self):
llm = DeepSeekLLMService(api_key=os.getenv("DEEPSEEK_API_KEY"), model="deepseek-chat")
super().__init__(llm)
if __name__ == "__main__":
asyncio.run(DeepSeekWeatherBot().run())

View File

@@ -1,29 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
from base_function_calling import WeatherBot
from dotenv import load_dotenv
from pipecat.services.fireworks import FireworksLLMService
load_dotenv(override=True)
class FireworksWeatherBot(WeatherBot):
"""Main class defining the LLM and passing it to the base handler."""
def __init__(self):
llm = FireworksLLMService(
api_key=os.getenv("FIREWORKS_API_KEY"),
)
super().__init__(llm)
if __name__ == "__main__":
asyncio.run(FireworksWeatherBot().run())

View File

@@ -1,38 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
from dotenv import load_dotenv
from multimodal_base_function_calling import MultimodalWeatherBot
from pipecat.adapters.schemas.tools_schema import AdapterType
from pipecat.services.gemini_multimodal_live import GeminiMultimodalLiveLLMService
load_dotenv(override=True)
class GeminiMultimodalWeatherBot(MultimodalWeatherBot):
"""Main class defining the LLM and passing it to the base handler."""
def __init__(self):
search_tool = {"google_search": {}}
tools_def = MultimodalWeatherBot.tools()
tools_def.custom_tools = {AdapterType.GEMINI: [search_tool]}
llm = GeminiMultimodalLiveLLMService(
api_key=os.getenv("GOOGLE_API_KEY"),
voice_id="Puck",
transcribe_user_audio=True,
transcribe_model_audio=True,
tools=tools_def,
)
super().__init__(llm)
if __name__ == "__main__":
asyncio.run(GeminiMultimodalWeatherBot().run())

View File

@@ -1,27 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
from base_function_calling import WeatherBot
from dotenv import load_dotenv
from pipecat.services.google import GoogleLLMService
load_dotenv(override=True)
class GeminiWeatherBot(WeatherBot):
"""Main class defining the LLM and passing it to the base handler."""
def __init__(self):
llm = GoogleLLMService(api_key=os.getenv("GOOGLE_API_KEY"), model="gemini-2.0-flash-001")
super().__init__(llm)
if __name__ == "__main__":
asyncio.run(GeminiWeatherBot().run())

View File

@@ -1,27 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
from base_function_calling import WeatherBot
from dotenv import load_dotenv
from pipecat.services.grok import GrokLLMService
load_dotenv(override=True)
class GrokWeatherBot(WeatherBot):
"""Main class defining the LLM and passing it to the base handler."""
def __init__(self):
llm = GrokLLMService(api_key=os.getenv("GROK_API_KEY"))
super().__init__(llm)
if __name__ == "__main__":
asyncio.run(GrokWeatherBot().run())

View File

@@ -1,27 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
from base_function_calling import WeatherBot
from dotenv import load_dotenv
from pipecat.services.groq import GroqLLMService
load_dotenv(override=True)
class GroqWeatherBot(WeatherBot):
"""Main class defining the LLM and passing it to the base handler."""
def __init__(self):
llm = GroqLLMService(api_key=os.getenv("GROQ_API_KEY"), model="llama-3.3-70b-versatile")
super().__init__(llm)
if __name__ == "__main__":
asyncio.run(GroqWeatherBot().run())

View File

@@ -1,29 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
from base_function_calling import WeatherBot
from dotenv import load_dotenv
from pipecat.services.nim import NimLLMService
load_dotenv(override=True)
class NimWeatherBot(WeatherBot):
"""Main class defining the LLM and passing it to the base handler."""
def __init__(self):
llm = NimLLMService(
api_key=os.getenv("NVIDIA_API_KEY"), model="meta/llama-3.3-70b-instruct"
)
super().__init__(llm)
if __name__ == "__main__":
asyncio.run(NimWeatherBot().run())

View File

@@ -1,43 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
from dotenv import load_dotenv
from multimodal_base_function_calling import MultimodalWeatherBot
from pipecat.services.openai_realtime_beta import (
InputAudioTranscription,
OpenAIRealtimeBetaLLMService,
SessionProperties,
TurnDetection,
)
load_dotenv(override=True)
class OpenAiRealTimeWeatherBot(MultimodalWeatherBot):
"""Main class defining the LLM and passing it to the base handler."""
def __init__(self):
session_properties = SessionProperties(
input_audio_transcription=InputAudioTranscription(),
# Set openai TurnDetection parameters. Not setting this at all will turn it
# on by default
turn_detection=TurnDetection(silence_duration_ms=1000),
)
llm = OpenAIRealtimeBetaLLMService(
api_key=os.getenv("OPENAI_API_KEY"),
session_properties=session_properties,
start_audio_paused=False,
)
super().__init__(llm)
if __name__ == "__main__":
asyncio.run(OpenAiRealTimeWeatherBot().run())

View File

@@ -1,27 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
from base_function_calling import WeatherBot
from dotenv import load_dotenv
from pipecat.services.openai import OpenAILLMService
load_dotenv(override=True)
class OpenAiWeatherBot(WeatherBot):
"""Main class defining the LLM and passing it to the base handler."""
def __init__(self):
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
super().__init__(llm)
if __name__ == "__main__":
asyncio.run(OpenAiWeatherBot().run())

View File

@@ -1,29 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
from base_function_calling import WeatherBot
from dotenv import load_dotenv
from pipecat.services.openrouter import OpenRouterLLMService
load_dotenv(override=True)
class OpenRouterWeatherBot(WeatherBot):
"""Main class defining the LLM and passing it to the base handler."""
def __init__(self):
llm = OpenRouterLLMService(
api_key=os.getenv("OPENROUTER_API_KEY"), model="openai/gpt-4o-2024-11-20"
)
super().__init__(llm)
if __name__ == "__main__":
asyncio.run(OpenRouterWeatherBot().run())

View File

@@ -1,30 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
from base_function_calling import WeatherBot
from dotenv import load_dotenv
from pipecat.services.together import TogetherLLMService
load_dotenv(override=True)
class TogetherWeatherBot(WeatherBot):
"""Main class defining the LLM and passing it to the base handler."""
def __init__(self):
llm = TogetherLLMService(
api_key=os.getenv("TOGETHER_API_KEY"),
model="meta-llama/Meta-Llama-3.1-70B-Instruct-Turbo",
)
super().__init__(llm)
if __name__ == "__main__":
asyncio.run(TogetherWeatherBot().run())

View File

@@ -20,14 +20,17 @@ classifiers = [
"Topic :: Scientific/Engineering :: Artificial Intelligence"
]
dependencies = [
"aiohttp~=3.11.13",
"aiohttp~=3.11.11",
"audioop-lts~=0.2.1; python_version>='3.13'",
# We need an older version of `httpx` that doesn't remove the deprecated
# `proxies` argument. This is necessary for Azure and Anthropic clients.
"httpx~=0.27.2",
"loguru~=0.7.3",
"Markdown~=3.7",
"numpy~=1.26.4",
"Pillow~=11.1.0",
"protobuf~=5.29.3",
"pydantic~=2.10.6",
"pydantic~=2.10.5",
"pyloudnorm~=0.1.1",
"resampy~=0.4.3",
"soxr~=0.5.0",
@@ -39,7 +42,7 @@ Source = "https://github.com/pipecat-ai/pipecat"
Website = "https://pipecat.ai"
[project.optional-dependencies]
anthropic = [ "anthropic~=0.47.2" ]
anthropic = [ "anthropic~=0.45.2" ]
assemblyai = [ "assemblyai~=0.36.0" ]
aws = [ "boto3~=1.35.99" ]
azure = [ "azure-cognitiveservices-speech~=1.42.0"]
@@ -47,13 +50,13 @@ canonical = [ "aiofiles~=24.1.0" ]
cartesia = [ "cartesia~=1.3.1", "websockets~=13.1" ]
cerebras = []
deepseek = []
daily = [ "daily-python~=0.15.0" ]
daily = [ "daily-python~=0.14.2" ]
deepgram = [ "deepgram-sdk~=3.8.0" ]
elevenlabs = [ "websockets~=13.1" ]
fal = [ "fal-client~=0.5.6" ]
fish = [ "ormsgpack~=1.7.0", "websockets~=13.1" ]
gladia = [ "websockets~=13.1" ]
google = [ "google-cloud-speech~=2.31.0", "google-cloud-texttospeech~=2.25.0", "google-genai~=1.3.0", "google-generativeai~=0.8.4" ]
google = [ "google-cloud-speech~=2.31.0", "google-cloud-texttospeech~=2.25.0", "google-genai~=1.2.0", "google-generativeai~=0.8.4" ]
grok = []
groq = []
gstreamer = [ "pygobject~=3.50.0" ]
@@ -70,7 +73,7 @@ noisereduce = [ "noisereduce~=3.0.3" ]
openai = [ "websockets~=13.1" ]
openpipe = [ "openpipe~=4.45.0" ]
perplexity = []
playht = [ "pyht~=0.1.12", "websockets~=13.1" ]
playht = [ "pyht~=0.1.6", "websockets~=13.1" ]
rime = [ "websockets~=13.1" ]
riva = [ "nvidia-riva-client~=2.18.0" ]
sentry = [ "sentry-sdk~=2.20.0" ]

View File

@@ -1,4 +0,0 @@
ruff format src
ruff format examples
ruff format tests
ruff check --select I --fix

View File

@@ -1,22 +0,0 @@
from abc import ABC, abstractmethod
from typing import Any, List, Union, cast
from loguru import logger
from pipecat.adapters.schemas.tools_schema import ToolsSchema
class BaseLLMAdapter(ABC):
@abstractmethod
def to_provider_tools_format(self, tools_schema: ToolsSchema) -> List[Any]:
"""Converts tools to the provider's format."""
pass
def from_standard_tools(self, tools: Any) -> List[Any]:
if isinstance(tools, ToolsSchema):
logger.debug(f"Retrieving the tools using the adapter: {type(self)}")
return self.to_provider_tools_format(tools)
# Fallback to return the same tools in case they are not in a standard format
return tools
# TODO: we can move the logic to also handle the Messages here

View File

@@ -1,55 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
from typing import Any, Dict, List
class FunctionSchema:
def __init__(
self, name: str, description: str, properties: Dict[str, Any], required: List[str]
) -> None:
"""Standardized function schema representation.
:param name: Name of the function.
:param description: Description of the function.
:param properties: Dictionary defining properties types and descriptions.
:param required: List of required parameters.
"""
self._name = name
self._description = description
self._properties = properties
self._required = required
def to_default_dict(self) -> Dict[str, Any]:
"""Converts the function schema to a dictionary.
:return: Dictionary representation of the function schema.
"""
return {
"name": self._name,
"description": self._description,
"parameters": {
"type": "object",
"properties": self._properties,
"required": self._required,
},
}
@property
def name(self) -> str:
return self._name
@property
def description(self) -> str:
return self._description
@property
def properties(self) -> Dict[str, Any]:
return self._properties
@property
def required(self) -> List[str]:
return self._required

View File

@@ -1,43 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
from enum import Enum
from typing import Any, Dict, List
from pipecat.adapters.schemas.function_schema import FunctionSchema
class AdapterType(Enum):
GEMINI = "gemini" # that is the only service where we are able to add custom tools for now
class ToolsSchema:
def __init__(
self,
standard_tools: List[FunctionSchema],
custom_tools: Dict[AdapterType, List[Dict[str, Any]]] = None,
) -> None:
"""
A schema for tools that includes both standardized function schemas
and custom tools that do not follow the FunctionSchema format.
:param standard_tools: List of tools following FunctionSchema.
:param custom_tools: List of tools in a custom format (e.g., search_tool).
"""
self._standard_tools = standard_tools
self._custom_tools = custom_tools
@property
def standard_tools(self) -> List[FunctionSchema]:
return self._standard_tools
@property
def custom_tools(self) -> Dict[AdapterType, List[Dict[str, Any]]]:
return self._custom_tools
@custom_tools.setter
def custom_tools(self, value: Dict[AdapterType, List[Dict[str, Any]]]) -> None:
self._custom_tools = value

View File

@@ -1,34 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
from typing import Any, Dict, List, Union
from pipecat.adapters.base_llm_adapter import BaseLLMAdapter
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
class AnthropicLLMAdapter(BaseLLMAdapter):
@staticmethod
def _to_anthropic_function_format(function: FunctionSchema) -> Dict[str, Any]:
return {
"name": function.name,
"description": function.description,
"input_schema": {
"type": "object",
"properties": function.properties,
"required": function.required,
},
}
def to_provider_tools_format(self, tools_schema: ToolsSchema) -> List[Dict[str, Any]]:
"""Converts function schemas to Anthropic's function-calling format.
:return: Anthropic formatted function call definition.
"""
functions_schema = tools_schema.standard_tools
return [self._to_anthropic_function_format(func) for func in functions_schema]

View File

@@ -1,28 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
from typing import Any, Dict, List, Union
from pipecat.adapters.base_llm_adapter import BaseLLMAdapter
from pipecat.adapters.schemas.tools_schema import AdapterType, ToolsSchema
class GeminiLLMAdapter(BaseLLMAdapter):
def to_provider_tools_format(self, tools_schema: ToolsSchema) -> List[Dict[str, Any]]:
"""Converts function schemas to Gemini's function-calling format.
:return: Gemini formatted function call definition.
"""
functions_schema = tools_schema.standard_tools
formatted_standard_tools = [
{"function_declarations": [func.to_default_dict() for func in functions_schema]}
]
custom_gemini_tools = []
if tools_schema.custom_tools:
custom_gemini_tools = tools_schema.custom_tools.get(AdapterType.GEMINI, [])
return formatted_standard_tools + custom_gemini_tools

View File

@@ -1,24 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
from typing import List
from openai.types.chat import ChatCompletionToolParam
from pipecat.adapters.base_llm_adapter import BaseLLMAdapter
from pipecat.adapters.schemas.tools_schema import ToolsSchema
class OpenAILLMAdapter(BaseLLMAdapter):
def to_provider_tools_format(self, tools_schema: ToolsSchema) -> List[ChatCompletionToolParam]:
"""Converts function schemas to OpenAI's function-calling format.
:return: OpenAI formatted function call definition.
"""
functions_schema = tools_schema.standard_tools
return [
ChatCompletionToolParam(type="function", function=func.to_default_dict())
for func in functions_schema
]

View File

@@ -1,34 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
from typing import Any, Dict, List, Union
from pipecat.adapters.base_llm_adapter import BaseLLMAdapter
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
class OpenAIRealtimeLLMAdapter(BaseLLMAdapter):
@staticmethod
def _to_openai_realtime_function_format(function: FunctionSchema) -> Dict[str, Any]:
return {
"type": "function",
"name": function.name,
"description": function.description,
"parameters": {
"type": "object",
"properties": function.properties,
"required": function.required,
},
}
def to_provider_tools_format(self, tools_schema: ToolsSchema) -> List[Dict[str, Any]]:
"""Converts function schemas to Openai Realtime function-calling format.
:return: Openai Realtime formatted function call definition.
"""
functions_schema = tools_schema.standard_tools
return [self._to_openai_realtime_function_format(func) for func in functions_schema]

View File

@@ -1,32 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
from abc import ABC, abstractmethod
from enum import Enum
from typing import Optional
class EndOfTurnState(Enum):
COMPLETE = 1
INCOMPLETE = 2
class BaseEndOfTurnAnalyzer(ABC):
def __init__(self, *, sample_rate: Optional[int] = None):
self._init_sample_rate = sample_rate
self._sample_rate = 0
@property
def sample_rate(self) -> int:
return self._sample_rate
def set_sample_rate(self, sample_rate: int):
self._sample_rate = self._init_sample_rate or sample_rate
@abstractmethod
def analyze_audio(self, buffer: bytes) -> EndOfTurnState:
pass

View File

@@ -1,83 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import numpy as np
import torch
from loguru import logger
from transformers import AutoFeatureExtractor, Wav2Vec2BertForSequenceClassification
from pipecat.audio.turn.base_turn_analyzer import BaseEndOfTurnAnalyzer, EndOfTurnState
# MODEL_PATH = "model-v1"
MODEL_PATH = "pipecat-ai/smart-turn"
class SmartTurnAnalyzer(BaseEndOfTurnAnalyzer):
def __init__(self):
super().__init__()
self._audio_buffer = bytearray()
logger.debug("Loading Smart Turn model...")
# Load model and processor
model = Wav2Vec2BertForSequenceClassification.from_pretrained(MODEL_PATH)
self._processor = AutoFeatureExtractor.from_pretrained(MODEL_PATH)
# Set model to evaluation mode and move to GPU if available
self._device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self._model = model.to(self._device)
self._model.eval()
logger.debug("Loaded Smart Turn")
def analyze_audio(self, buffer: bytes) -> EndOfTurnState:
self._audio_buffer += buffer
if len(self._audio_buffer) < 16000 * 2 * 6:
return EndOfTurnState.INCOMPLETE
audio_int16 = np.frombuffer(self._audio_buffer, dtype=np.int16)
# Divide by 32768 because we have signed 16-bit data.
audio_float32 = np.frombuffer(audio_int16, dtype=np.int16).astype(np.float32) / 32768.0
print(audio_float32)
# Process audio
inputs = self._processor(
audio_float32,
sampling_rate=16000,
padding="max_length",
truncation=True,
max_length=800, # Maximum length as specified in training
return_attention_mask=True,
return_tensors="pt",
)
# Move inputs to device
inputs = {k: v.to(self._device) for k, v in inputs.items()}
# Run inference
with torch.no_grad():
outputs = self._model(**inputs)
logits = outputs.logits
# Get probabilities using softmax
probabilities = torch.nn.functional.softmax(logits, dim=1)
completion_prob = probabilities[0, 1].item() # Probability of class 1 (Complete)
# Make prediction (1 for Complete, 0 for Incomplete)
prediction = 1 if completion_prob > 0.5 else 0
state = EndOfTurnState.COMPLETE if prediction == 1 else EndOfTurnState.INCOMPLETE
if state == EndOfTurnState.COMPLETE:
self._audio_buffer = bytearray()
else:
self._audio_buffer = self._audio_buffer[len(buffer) :]
print("AAAAAAAAAAAA", state)
return state

View File

@@ -4,7 +4,7 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
from abc import ABC, abstractmethod
from abc import abstractmethod
from enum import Enum
from typing import Optional
@@ -33,7 +33,7 @@ class VADParams(BaseModel):
min_volume: float = VAD_MIN_VOLUME
class VADAnalyzer(ABC):
class VADAnalyzer:
def __init__(self, *, sample_rate: Optional[int] = None, params: VADParams):
self._init_sample_rate = sample_rate
self._sample_rate = 0

View File

@@ -568,8 +568,7 @@ class UserStoppedSpeakingFrame(SystemFrame):
@dataclass
class EmulateUserStartedSpeakingFrame(SystemFrame):
"""Emitted by internal processors upstream to emulate VAD behavior when a
user starts speaking.
"""
user starts speaking."""
pass
@@ -577,20 +576,7 @@ class EmulateUserStartedSpeakingFrame(SystemFrame):
@dataclass
class EmulateUserStoppedSpeakingFrame(SystemFrame):
"""Emitted by internal processors upstream to emulate VAD behavior when a
user stops speaking.
"""
pass
@dataclass
class UserEndOfTurnFrame(SystemFrame):
"""Emitted by VAD to indicate that a user has started speaking. This can be
used for interruptions or other times when detecting that someone is
speaking is more important than knowing what they're saying (as you will
with a TranscriptionFrame)
"""
user stops speaking."""
pass

View File

@@ -5,14 +5,25 @@
#
import asyncio
from abc import abstractmethod
from abc import ABC, abstractmethod
from typing import AsyncIterable, Iterable
from pipecat.frames.frames import Frame
from pipecat.utils.base_object import BaseObject
class BaseTask(BaseObject):
class BaseTask(ABC):
@property
@abstractmethod
def id(self) -> int:
"""Returns the unique indetifier for this task."""
pass
@property
@abstractmethod
def name(self) -> str:
"""Returns the name of this task."""
pass
@abstractmethod
def set_event_loop(self, loop: asyncio.AbstractEventLoop):
"""Sets the event loop that this task will run on."""

View File

@@ -12,10 +12,10 @@ from typing import Optional
from loguru import logger
from pipecat.pipeline.task import PipelineTask
from pipecat.utils.base_object import BaseObject
from pipecat.utils.utils import obj_count, obj_id
class PipelineRunner(BaseObject):
class PipelineRunner:
def __init__(
self,
*,
@@ -24,7 +24,8 @@ class PipelineRunner(BaseObject):
force_gc: bool = False,
loop: Optional[asyncio.AbstractEventLoop] = None,
):
super().__init__(name=name)
self.id: int = obj_id()
self.name: str = name or f"{self.__class__.__name__}#{obj_count(self)}"
self._tasks = {}
self._sig_task = None
@@ -73,3 +74,6 @@ class PipelineRunner(BaseObject):
collected = gc.collect()
logger.debug(f"Garbage collector: collected {collected} objects.")
logger.debug(f"Garbage collector: uncollectable objects {gc.garbage}")
def __str__(self):
return self.name

View File

@@ -32,6 +32,7 @@ from pipecat.pipeline.base_task import BaseTask
from pipecat.pipeline.task_observer import TaskObserver
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.utils.asyncio import BaseTaskManager, TaskManager
from pipecat.utils.utils import obj_count, obj_id
HEARTBEAT_SECONDS = 1.0
HEARTBEAT_MONITOR_SECONDS = HEARTBEAT_SECONDS * 5
@@ -137,7 +138,9 @@ class PipelineTask(BaseTask):
task_manager: Optional[BaseTaskManager] = None,
check_dangling_tasks: bool = True,
):
super().__init__()
self._id: int = obj_id()
self._name: str = f"{self.__class__.__name__}#{obj_count(self)}"
self._pipeline = pipeline
self._clock = clock
self._params = params
@@ -177,6 +180,16 @@ class PipelineTask(BaseTask):
self._observer = TaskObserver(observers=observers, task_manager=self._task_manager)
@property
def id(self) -> int:
"""Returns the unique indetifier for this task."""
return self._id
@property
def name(self) -> str:
"""Returns the name of this task."""
return self._name
@property
def params(self) -> PipelineParams:
"""Returns the pipeline parameters of this task."""
@@ -421,3 +434,6 @@ class PipelineTask(BaseTask):
tasks = [t.get_name() for t in self._task_manager.current_tasks()]
if tasks:
logger.warning(f"Dangling tasks detected: {tasks}")
def __str__(self):
return self.name

View File

@@ -13,6 +13,7 @@ from pipecat.frames.frames import Frame
from pipecat.observers.base_observer import BaseObserver
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.utils.asyncio import BaseTaskManager
from pipecat.utils.utils import obj_count, obj_id
@dataclass
@@ -55,10 +56,20 @@ class TaskObserver(BaseObserver):
"""
def __init__(self, *, observers: List[BaseObserver] = [], task_manager: BaseTaskManager):
self._id: int = obj_id()
self._name: str = f"{self.__class__.__name__}#{obj_count(self)}"
self._observers = observers
self._task_manager = task_manager
self._proxies: List[Proxy] = []
@property
def id(self) -> int:
return self._id
@property
def name(self) -> str:
return self._name
async def start(self):
"""Starts all proxy observer tasks."""
self._proxies = self._create_proxies(self._observers)
@@ -89,7 +100,7 @@ class TaskObserver(BaseObserver):
queue = asyncio.Queue()
task = self._task_manager.create_task(
self._proxy_task_handler(queue, observer),
f"TaskObserver::{observer.__class__.__name__}::_proxy_task_handler",
f"{self}::{observer.__class__.__name__}::_proxy_task_handler",
)
proxy = Proxy(queue=queue, task=task, observer=observer)
proxies.append(proxy)
@@ -101,3 +112,6 @@ class TaskObserver(BaseObserver):
await observer.on_push_frame(
data.src, data.dst, data.frame, data.direction, data.timestamp
)
def __str__(self):
return self.name

View File

@@ -22,7 +22,6 @@ from pipecat.frames.frames import (
LLMMessagesFrame,
LLMMessagesUpdateFrame,
LLMSetToolsFrame,
LLMTextFrame,
StartFrame,
StartInterruptionFrame,
TextFrame,
@@ -37,59 +36,6 @@ from pipecat.processors.aggregators.openai_llm_context import (
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
class LLMFullResponseAggregator(FrameProcessor):
"""This is an LLM aggregator that aggregates a full LLM completion. It
aggregates LLM text frames (tokens) received between
`LLMFullResponseStartFrame` and `LLMFullResponseEndFrame`. Every full
completion is returned via the "on_completion" event handler:
@aggregator.event_handler("on_completion")
async def on_completion(
aggregator: LLMFullResponseAggregator,
completion: str,
completed: bool,
)
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._aggregation = ""
self._started = False
self._register_event_handler("on_completion")
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, StartInterruptionFrame):
await self._call_event_handler("on_completion", self._aggregation, False)
self._aggregation = ""
self._started = False
elif isinstance(frame, LLMFullResponseStartFrame):
await self._handle_llm_start(frame)
elif isinstance(frame, LLMFullResponseEndFrame):
await self._handle_llm_end(frame)
elif isinstance(frame, LLMTextFrame):
await self._handle_llm_text(frame)
await self.push_frame(frame, direction)
async def _handle_llm_start(self, _: LLMFullResponseStartFrame):
self._started = True
async def _handle_llm_end(self, _: LLMFullResponseEndFrame):
await self._call_event_handler("on_completion", self._aggregation, True)
self._started = False
self._aggregation = ""
async def _handle_llm_text(self, frame: TextFrame):
if not self._started:
return
self._aggregation += frame.text
class BaseLLMResponseAggregator(FrameProcessor):
"""This is the base class for all LLM response aggregators. These
aggregators process incoming frames and aggregate content until they are

View File

@@ -20,8 +20,6 @@ from openai.types.chat import (
)
from PIL import Image
from pipecat.adapters.base_llm_adapter import BaseLLMAdapter
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.frames.frames import (
AudioRawFrame,
Frame,
@@ -46,20 +44,13 @@ class OpenAILLMContext:
def __init__(
self,
messages: Optional[List[ChatCompletionMessageParam]] = None,
tools: List[ChatCompletionToolParam] | NotGiven | ToolsSchema = NOT_GIVEN,
tools: List[ChatCompletionToolParam] | NotGiven = NOT_GIVEN,
tool_choice: ChatCompletionToolChoiceOptionParam | NotGiven = NOT_GIVEN,
):
self._messages: List[ChatCompletionMessageParam] = messages if messages else []
self._tool_choice: ChatCompletionToolChoiceOptionParam | NotGiven = tool_choice
self._tools: List[ChatCompletionToolParam] | NotGiven | ToolsSchema = tools
self._tools: List[ChatCompletionToolParam] | NotGiven = tools
self._user_image_request_context = {}
self._llm_adapter: Optional[BaseLLMAdapter] = None
def get_llm_adapter(self) -> Optional[BaseLLMAdapter]:
return self._llm_adapter
def set_llm_adapter(self, llm_adapter: BaseLLMAdapter):
self._llm_adapter = llm_adapter
@staticmethod
def from_messages(messages: List[dict]) -> "OpenAILLMContext":
@@ -76,9 +67,7 @@ class OpenAILLMContext:
return self._messages
@property
def tools(self) -> List[ChatCompletionToolParam] | NotGiven | List[Any]:
if self._llm_adapter:
return self._llm_adapter.from_standard_tools(self._tools)
def tools(self) -> List[ChatCompletionToolParam] | NotGiven:
return self._tools
@property
@@ -163,7 +152,7 @@ class OpenAILLMContext:
def set_tool_choice(self, tool_choice: ChatCompletionToolChoiceOptionParam | NotGiven):
self._tool_choice = tool_choice
def set_tools(self, tools: List[ChatCompletionToolParam] | NotGiven | ToolsSchema = NOT_GIVEN):
def set_tools(self, tools: List[ChatCompletionToolParam] | NotGiven = NOT_GIVEN):
if tools != NOT_GIVEN and len(tools) == 0:
tools = NOT_GIVEN
self._tools = tools

View File

@@ -21,32 +21,20 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
class AudioBufferProcessor(FrameProcessor):
"""Processes and buffers audio frames from both input (user) and output (bot) sources.
"""This processor buffers audio raw frames (input and output). The mixed
audio can be obtained by registering an "on_audio_data" event handler.
The event handler will be called every time `buffer_size` is reached.
This processor manages audio buffering and synchronization, providing both merged and
track-specific audio access through event handlers. It supports various audio configurations
including sample rate conversion and mono/stereo output.
You can provide the desired output `sample_rate` and incoming audio frames
will resampled to match it. Also, you can provide the number of channels, 1
for mono and 2 for stereo. With mono audio user and bot audio will be mixed,
in the case of stereo the left channel will be used for the user's audio and
the right channel for the bot.
Events:
on_audio_data: Triggered when buffer_size is reached, providing merged audio
on_track_audio_data: Triggered when buffer_size is reached, providing separate tracks
Most of the time, user audio will be a continuous stream but it's possible
that in some cases only the spoken audio is sent. To accomodate for those
cases make sure to set `user_continuous_stream` accordingly.
Args:
sample_rate (Optional[int]): Desired output sample rate. If None, uses source rate
num_channels (int): Number of channels (1 for mono, 2 for stereo). Defaults to 1
buffer_size (int): Size of buffer before triggering events. 0 for no buffering
user_continuous_stream (bool): Whether user audio is continuous or speech-only
Audio handling:
- Mono output (num_channels=1): User and bot audio are mixed
- Stereo output (num_channels=2): User audio on left, bot audio on right
- Automatic resampling of incoming audio to match desired sample_rate
- Silence insertion for non-continuous audio streams
- Buffer synchronization between user and bot audio
Note:
When user_continuous_stream is False, the processor expects only speech
segments and will handle silence insertion between segments automatically.
"""
def __init__(
@@ -77,45 +65,21 @@ class AudioBufferProcessor(FrameProcessor):
self._resampler = create_default_resampler()
self._register_event_handler("on_audio_data")
self._register_event_handler("on_track_audio_data")
@property
def sample_rate(self) -> int:
"""Current sample rate of the audio processor.
Returns:
int: The sample rate in Hz
"""
return self._sample_rate
@property
def num_channels(self) -> int:
"""Number of channels in the audio output.
Returns:
int: Number of channels (1 for mono, 2 for stereo)
"""
return self._num_channels
def has_audio(self) -> bool:
"""Check if both user and bot audio buffers contain data.
Returns:
bool: True if both buffers contain audio data
"""
return self._buffer_has_audio(self._user_audio_buffer) and self._buffer_has_audio(
self._bot_audio_buffer
)
def merge_audio_buffers(self) -> bytes:
"""Merge user and bot audio buffers into a single audio stream.
For mono output, audio is mixed. For stereo output, user audio is placed
on the left channel and bot audio on the right channel.
Returns:
bytes: Mixed audio data
"""
if self._num_channels == 1:
return mix_audio(bytes(self._user_audio_buffer), bytes(self._bot_audio_buffer))
elif self._num_channels == 2:
@@ -126,23 +90,14 @@ class AudioBufferProcessor(FrameProcessor):
return b""
async def start_recording(self):
"""Start recording audio from both user and bot.
Initializes recording state and resets audio buffers.
"""
self._recording = True
self._reset_recording()
async def stop_recording(self):
"""Stop recording and trigger final audio data handlers.
Calls audio handlers with any remaining buffered audio before stopping.
"""
await self._call_on_audio_data_handler()
self._recording = False
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process incoming audio frames and manage audio buffers."""
await super().process_frame(frame, direction)
# Update output sample rate if necessary.
@@ -205,21 +160,10 @@ class AudioBufferProcessor(FrameProcessor):
if not self.has_audio() or not self._recording:
return
# Call original handler with merged audio
merged_audio = self.merge_audio_buffers()
await self._call_event_handler(
"on_audio_data", merged_audio, self._sample_rate, self._num_channels
)
# Call new handler with separate tracks
await self._call_event_handler(
"on_track_audio_data",
bytes(self._user_audio_buffer),
bytes(self._bot_audio_buffer),
self._sample_rate,
self._num_channels,
)
self._reset_audio_buffers()
def _buffer_has_audio(self, buffer: bytearray) -> bool:

View File

@@ -5,6 +5,7 @@
#
import asyncio
import inspect
from enum import Enum
from typing import Awaitable, Callable, Coroutine, Optional
@@ -23,7 +24,7 @@ from pipecat.frames.frames import (
from pipecat.metrics.metrics import LLMTokenUsage, MetricsData
from pipecat.processors.metrics.frame_processor_metrics import FrameProcessorMetrics
from pipecat.utils.asyncio import BaseTaskManager
from pipecat.utils.base_object import BaseObject
from pipecat.utils.utils import obj_count, obj_id
class FrameDirection(Enum):
@@ -31,7 +32,7 @@ class FrameDirection(Enum):
UPSTREAM = 2
class FrameProcessor(BaseObject):
class FrameProcessor:
def __init__(
self,
*,
@@ -39,11 +40,14 @@ class FrameProcessor(BaseObject):
metrics: Optional[FrameProcessorMetrics] = None,
**kwargs,
):
super().__init__(name=name)
self._id: int = obj_id()
self._name = name or f"{self.__class__.__name__}#{obj_count(self)}"
self._parent: Optional["FrameProcessor"] = None
self._prev: Optional["FrameProcessor"] = None
self._next: Optional["FrameProcessor"] = None
self._event_handlers: dict = {}
# Clock
self._clock: Optional[BaseClock] = None
@@ -250,6 +254,23 @@ class FrameProcessor(BaseObject):
else:
await self.__push_queue.put((frame, direction))
def event_handler(self, event_name: str):
def decorator(handler):
self.add_event_handler(event_name, handler)
return handler
return decorator
def add_event_handler(self, event_name: str, handler):
if event_name not in self._event_handlers:
raise Exception(f"Event handler {event_name} not registered")
self._event_handlers[event_name].append(handler)
def _register_event_handler(self, event_name: str):
if event_name in self._event_handlers:
raise Exception(f"Event handler {event_name} already registered")
self._event_handlers[event_name] = []
async def __start(self, frame: StartFrame):
self.__create_input_task()
self.__create_push_task()
@@ -364,3 +385,16 @@ class FrameProcessor(BaseObject):
(frame, direction) = await self.__push_queue.get()
await self.__internal_push_frame(frame, direction)
self.__push_queue.task_done()
async def _call_event_handler(self, event_name: str, *args, **kwargs):
try:
for handler in self._event_handlers[event_name]:
if inspect.iscoroutinefunction(handler):
await handler(self, *args, **kwargs)
else:
handler(self, *args, **kwargs)
except Exception as e:
logger.exception(f"Exception in event handler {event_name}: {e}")
def __str__(self):
return self.name

View File

@@ -375,22 +375,6 @@ class RTVIMetricsMessage(BaseModel):
data: Mapping[str, Any]
class RTVIServerMessage(BaseModel):
label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL
type: Literal["server-message"] = "server-message"
data: Any
@dataclass
class RTVIServerMessageFrame(SystemFrame):
"""A frame for sending server messages to the client."""
data: Any
def __str__(self):
return f"{self.name}(data: {self.data})"
class RTVIFrameProcessor(FrameProcessor):
def __init__(self, direction: FrameDirection = FrameDirection.DOWNSTREAM, **kwargs):
super().__init__(**kwargs)
@@ -726,9 +710,6 @@ class RTVIObserver(BaseObserver):
mark_as_seen = False
elif isinstance(frame, MetricsFrame):
await self._handle_metrics(frame)
elif isinstance(frame, RTVIServerMessageFrame):
message = RTVIServerMessage(data=frame.data)
await self.push_transport_message_urgent(message)
if mark_as_seen:
self._frames_seen.add(frame.id)

View File

@@ -8,12 +8,10 @@ import asyncio
import io
import wave
from abc import abstractmethod
from typing import Any, AsyncGenerator, Dict, List, Mapping, Optional, Tuple, Type
from typing import Any, AsyncGenerator, Dict, List, Mapping, Optional, Tuple
from loguru import logger
from pipecat.adapters.base_llm_adapter import BaseLLMAdapter
from pipecat.adapters.services.open_ai_adapter import OpenAILLMAdapter
from pipecat.audio.utils import calculate_audio_volume, exp_smoothing
from pipecat.frames.frames import (
AudioRawFrame,
@@ -139,23 +137,10 @@ class AIService(FrameProcessor):
class LLMService(AIService):
"""This class is a no-op but serves as a base class for LLM services."""
# OpenAILLMAdapter is used as the default adapter since it aligns with most LLM implementations.
# However, subclasses should override this with a more specific adapter when necessary.
adapter_class: Type[BaseLLMAdapter] = OpenAILLMAdapter
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._callbacks = {}
self._start_callbacks = {}
self._adapter = self.adapter_class()
def get_llm_adapter(self) -> BaseLLMAdapter:
return self._adapter
def create_context_aggregator(
self, context: OpenAILLMContext, *, assistant_expect_stripped_words: bool = True
) -> Any:
pass
self._register_event_handler("on_completion_timeout")

View File

@@ -11,14 +11,13 @@ import io
import json
import re
from dataclasses import dataclass
from typing import Any, Dict, List, Mapping, Optional, Union
from typing import Any, Dict, List, Optional, Union
import httpx
from loguru import logger
from PIL import Image
from pydantic import BaseModel, Field
from pipecat.adapters.services.anthropic_adapter import AnthropicLLMAdapter
from pipecat.frames.frames import (
Frame,
FunctionCallInProgressFrame,
@@ -86,9 +85,6 @@ class AnthropicLLMService(LLMService):
use `AsyncAnthropicBedrock` and `AsyncAnthropicVertex` clients
"""
# Overriding the default adapter to use the Anthropic one.
adapter_class = AnthropicLLMAdapter
class InputParams(BaseModel):
enable_prompt_caching_beta: Optional[bool] = False
max_tokens: Optional[int] = Field(default_factory=lambda: 4096, ge=1)
@@ -127,38 +123,16 @@ class AnthropicLLMService(LLMService):
def enable_prompt_caching_beta(self) -> bool:
return self._enable_prompt_caching_beta
@staticmethod
def create_context_aggregator(
self,
context: OpenAILLMContext,
*,
user_kwargs: Mapping[str, Any] = {},
assistant_kwargs: Mapping[str, Any] = {},
context: OpenAILLMContext, *, assistant_expect_stripped_words: bool = True
) -> AnthropicContextAggregatorPair:
"""Create an instance of AnthropicContextAggregatorPair from an
OpenAILLMContext. Constructor keyword arguments for both the user and
assistant aggregators can be provided.
Args:
context (OpenAILLMContext): The LLM context.
user_kwargs (Mapping[str, Any], optional): Additional keyword
arguments for the user context aggregator constructor. Defaults
to an empty mapping.
assistant_kwargs (Mapping[str, Any], optional): Additional keyword
arguments for the assistant context aggregator
constructor. Defaults to an empty mapping.
Returns:
AnthropicContextAggregatorPair: A pair of context aggregators, one
for the user and one for the assistant, encapsulated in an
AnthropicContextAggregatorPair.
"""
context.set_llm_adapter(self.get_llm_adapter())
if isinstance(context, OpenAILLMContext):
context = AnthropicLLMContext.from_openai_context(context)
user = AnthropicUserContextAggregator(context, **user_kwargs)
assistant = AnthropicAssistantContextAggregator(context, **assistant_kwargs)
user = AnthropicUserContextAggregator(context)
assistant = AnthropicAssistantContextAggregator(
context, expect_stripped_words=assistant_expect_stripped_words
)
return AnthropicContextAggregatorPair(_user=user, _assistant=assistant)
async def _process_context(self, context: OpenAILLMContext):
@@ -178,7 +152,7 @@ class AnthropicLLMService(LLMService):
await self.start_processing_metrics()
logger.debug(
f"{self}: Generating chat [{context.system}] | [{context.get_messages_for_logging()}]"
f"Generating chat: {context.system} | {context.get_messages_for_logging()}"
)
messages = context.messages
@@ -388,7 +362,6 @@ class AnthropicLLMContext(OpenAILLMContext):
tools=openai_context.tools,
tool_choice=openai_context.tool_choice,
)
self.set_llm_adapter(openai_context.get_llm_adapter())
self._restructure_from_openai_messages()
return self

View File

@@ -197,7 +197,7 @@ class PollyTTSService(TTSService):
return audio_data
return None
logger.debug(f"{self}: Generating TTS [{text}]")
logger.debug(f"Generating TTS: [{text}]")
try:
await self.start_ttfb_metrics()

View File

@@ -578,7 +578,7 @@ class AzureTTSService(AzureBaseTTSService):
self._audio_queue.put_nowait(None)
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"{self}: Generating TTS [{text}]")
logger.debug(f"Generating TTS: [{text}]")
try:
if self._speech_synthesizer is None:
@@ -645,7 +645,7 @@ class AzureHttpTTSService(AzureBaseTTSService):
)
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"{self}: Generating TTS [{text}]")
logger.debug(f"Generating TTS: [{text}]")
await self.start_ttfb_metrics()

View File

@@ -62,21 +62,17 @@ class CanonicalMetricsService(AIService):
self,
*,
aiohttp_session: aiohttp.ClientSession,
audio_buffer_processor: AudioBufferProcessor,
call_id: str,
assistant: str,
api_key: str,
api_url: str = "https://voiceapp.canonical.chat/api/v1",
assistant_speaks_first: bool = True,
output_dir: str = "recordings",
audio_buffer_processor: Optional[AudioBufferProcessor] = None,
context: Optional[OpenAILLMContext] = None,
**kwargs,
):
super().__init__(**kwargs)
# Validate that at least one of audio_buffer_processor or context is provided
if audio_buffer_processor is None and context is None:
raise ValueError("At least one of audio_buffer_processor or context must be specified")
self._aiohttp_session = aiohttp_session
self._audio_buffer_processor = audio_buffer_processor
self._api_key = api_key
@@ -89,36 +85,16 @@ class CanonicalMetricsService(AIService):
async def stop(self, frame: EndFrame):
await super().stop(frame)
await self._process_completion()
await self._process_audio()
async def cancel(self, frame: CancelFrame):
await super().cancel(frame)
await self._process_completion()
await self._process_audio()
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
await self.push_frame(frame, direction)
async def _process_completion(self):
if self._audio_buffer_processor is not None:
await self._process_audio()
elif self._context is not None:
await self._process_transcript()
async def _process_transcript(self):
params = {
"callId": self._call_id,
"assistant": {"id": self._assistant, "speaksFirst": self._assistant_speaks_first},
"transcript": self._context.messages,
}
response = await self._aiohttp_session.post(
f"{self._api_url}/call",
headers=self._request_headers(),
json=params,
)
if not response.ok:
logger.error(f"Failed to process transcript: {await response.text()}")
async def _process_audio(self):
audio_buffer_processor = self._audio_buffer_processor

View File

@@ -272,7 +272,7 @@ class CartesiaTTSService(AudioContextWordTTSService):
logger.error(f"{self} error, unknown message type: {msg}")
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"{self}: Generating TTS [{text}]")
logger.debug(f"Generating TTS: [{text}]")
try:
if not self._websocket:
@@ -358,7 +358,7 @@ class CartesiaHttpTTSService(TTSService):
await self._client.close()
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"{self}: Generating TTS [{text}]")
logger.debug(f"Generating TTS: [{text}]")
try:
voice_controls = None

View File

@@ -70,7 +70,7 @@ class DeepgramTTSService(TTSService):
return True
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"{self}: Generating TTS [{text}]")
logger.debug(f"Generating TTS: [{text}]")
options = SpeakOptions(
model=self._voice_id,

View File

@@ -116,44 +116,6 @@ def output_format_from_sample_rate(sample_rate: int) -> str:
return "pcm_16000"
def build_elevenlabs_voice_settings(
settings: Dict[str, Any],
) -> Optional[Dict[str, Union[float, bool]]]:
"""Build voice settings dictionary for ElevenLabs based on provided settings.
Args:
settings: Dictionary containing voice settings parameters
Returns:
Dictionary of voice settings or None if required parameters are missing
"""
voice_settings = {}
if settings["stability"] is not None and settings["similarity_boost"] is not None:
voice_settings["stability"] = settings["stability"]
voice_settings["similarity_boost"] = settings["similarity_boost"]
if settings["style"] is not None:
voice_settings["style"] = settings["style"]
if settings["use_speaker_boost"] is not None:
voice_settings["use_speaker_boost"] = settings["use_speaker_boost"]
if settings["speed"] is not None:
voice_settings["speed"] = settings["speed"]
else:
if settings["style"] is not None:
logger.warning(
"'style' is set but will not be applied because 'stability' and 'similarity_boost' are not both set."
)
if settings["use_speaker_boost"] is not None:
logger.warning(
"'use_speaker_boost' is set but will not be applied because 'stability' and 'similarity_boost' are not both set."
)
if settings["speed"] is not None:
logger.warning(
"'speed' is set but will not be applied because 'stability' and 'similarity_boost' are not both set."
)
return voice_settings or None
def calculate_word_times(
alignment_info: Mapping[str, Any], cumulative_time: float
) -> List[Tuple[str, float]]:
@@ -183,7 +145,6 @@ class ElevenLabsTTSService(InterruptibleWordTTSService):
similarity_boost: Optional[float] = None
style: Optional[float] = None
use_speaker_boost: Optional[bool] = None
speed: Optional[float] = None
auto_mode: Optional[bool] = True
@model_validator(mode="after")
@@ -241,7 +202,6 @@ class ElevenLabsTTSService(InterruptibleWordTTSService):
"similarity_boost": params.similarity_boost,
"style": params.style,
"use_speaker_boost": params.use_speaker_boost,
"speed": params.speed,
"auto_mode": str(params.auto_mode).lower(),
}
self.set_model_name(model)
@@ -264,7 +224,28 @@ class ElevenLabsTTSService(InterruptibleWordTTSService):
return language_to_elevenlabs_language(language)
def _set_voice_settings(self):
return build_elevenlabs_voice_settings(self._settings)
voice_settings = {}
if (
self._settings["stability"] is not None
and self._settings["similarity_boost"] is not None
):
voice_settings["stability"] = self._settings["stability"]
voice_settings["similarity_boost"] = self._settings["similarity_boost"]
if self._settings["style"] is not None:
voice_settings["style"] = self._settings["style"]
if self._settings["use_speaker_boost"] is not None:
voice_settings["use_speaker_boost"] = self._settings["use_speaker_boost"]
else:
if self._settings["style"] is not None:
logger.warning(
"'style' is set but will not be applied because 'stability' and 'similarity_boost' are not both set."
)
if self._settings["use_speaker_boost"] is not None:
logger.warning(
"'use_speaker_boost' is set but will not be applied because 'stability' and 'similarity_boost' are not both set."
)
return voice_settings or None
async def set_model(self, model: str):
await super().set_model(model)
@@ -414,7 +395,7 @@ class ElevenLabsTTSService(InterruptibleWordTTSService):
await self._websocket.send(json.dumps(msg))
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"{self}: Generating TTS [{text}]")
logger.debug(f"Generating TTS: [{text}]")
try:
if not self._websocket:
@@ -460,7 +441,6 @@ class ElevenLabsHttpTTSService(TTSService):
similarity_boost: Optional[float] = None
style: Optional[float] = None
use_speaker_boost: Optional[bool] = None
speed: Optional[float] = None
def __init__(
self,
@@ -490,7 +470,6 @@ class ElevenLabsHttpTTSService(TTSService):
"similarity_boost": params.similarity_boost,
"style": params.style,
"use_speaker_boost": params.use_speaker_boost,
"speed": params.speed,
}
self.set_model_name(model)
self.set_voice(voice_id)
@@ -500,8 +479,34 @@ class ElevenLabsHttpTTSService(TTSService):
def can_generate_metrics(self) -> bool:
return True
def _set_voice_settings(self):
return build_elevenlabs_voice_settings(self._settings)
def _set_voice_settings(self) -> Optional[Dict[str, Union[float, bool]]]:
"""Configure voice settings if stability and similarity_boost are provided.
Returns:
Dictionary of voice settings or None if required parameters are missing.
"""
voice_settings: Dict[str, Union[float, bool]] = {}
if (
self._settings["stability"] is not None
and self._settings["similarity_boost"] is not None
):
voice_settings["stability"] = float(self._settings["stability"])
voice_settings["similarity_boost"] = float(self._settings["similarity_boost"])
if self._settings["style"] is not None:
voice_settings["style"] = float(self._settings["style"])
if self._settings["use_speaker_boost"] is not None:
voice_settings["use_speaker_boost"] = bool(self._settings["use_speaker_boost"])
else:
if self._settings["style"] is not None:
logger.warning(
"'style' is set but will not be applied because 'stability' and 'similarity_boost' are not both set."
)
if self._settings["use_speaker_boost"] is not None:
logger.warning(
"'use_speaker_boost' is set but will not be applied because 'stability' and 'similarity_boost' are not both set."
)
return voice_settings or None
async def start(self, frame: StartFrame):
await super().start(frame)
@@ -516,7 +521,7 @@ class ElevenLabsHttpTTSService(TTSService):
Yields:
Frames containing audio data and status information
"""
logger.debug(f"{self}: Generating TTS [{text}]")
logger.debug(f"Generating TTS: [{text}]")
url = f"{self._base_url}/v1/text-to-speech/{self._voice_id}/stream"
@@ -565,12 +570,10 @@ class ElevenLabsHttpTTSService(TTSService):
await self.start_tts_usage_metrics(text)
# Process the streaming response
CHUNK_SIZE = 1024
yield TTSStartedFrame()
async for chunk in response.content.iter_chunked(CHUNK_SIZE):
if len(chunk) > 0:
async for chunk in response.content:
if chunk:
await self.stop_ttfb_metrics()
yield TTSAudioRawFrame(chunk, self.sample_rate, 1)
except Exception as e:

View File

@@ -178,7 +178,7 @@ class FishAudioTTSService(InterruptibleTTSService):
logger.error(f"Error processing message: {e}")
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"{self}: Generating Fish TTS: [{text}]")
logger.debug(f"Generating Fish TTS: [{text}]")
try:
if not self._websocket or self._websocket.closed:
await self._connect()

View File

@@ -9,14 +9,12 @@ import base64
import json
from dataclasses import dataclass
from enum import Enum
from typing import Any, Dict, List, Mapping, Optional, Union
from typing import Any, Dict, List, Optional
import websockets
from loguru import logger
from pydantic import BaseModel, Field
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.adapters.services.gemini_adapter import GeminiLLMAdapter
from pipecat.frames.frames import (
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
@@ -154,9 +152,6 @@ class InputParams(BaseModel):
class GeminiMultimodalLiveLLMService(LLMService):
# Overriding the default adapter to use the Gemini one.
adapter_class = GeminiLLMAdapter
def __init__(
self,
*,
@@ -167,7 +162,7 @@ class GeminiMultimodalLiveLLMService(LLMService):
start_audio_paused: bool = False,
start_video_paused: bool = False,
system_instruction: Optional[str] = None,
tools: Optional[Union[List[dict], ToolsSchema]] = None,
tools: Optional[List[dict]] = None,
transcribe_user_audio: bool = False,
transcribe_model_audio: bool = False,
params: InputParams = InputParams(),
@@ -440,7 +435,7 @@ class GeminiMultimodalLiveLLMService(LLMService):
)
if self._tools:
logger.debug(f"Gemini is configuring to use tools{self._tools}")
config.setup.tools = self.get_llm_adapter().from_standard_tools(self._tools)
config.setup.tools = self._tools
await self.send_client_event(config)
except Exception as e:
@@ -706,39 +701,11 @@ class GeminiMultimodalLiveLLMService(LLMService):
await self.push_frame(TTSStoppedFrame())
def create_context_aggregator(
self,
context: OpenAILLMContext,
*,
user_kwargs: Mapping[str, Any] = {},
assistant_kwargs: Mapping[str, Any] = {},
self, context: OpenAILLMContext, *, assistant_expect_stripped_words: bool = False
) -> GeminiMultimodalLiveContextAggregatorPair:
"""Create an instance of GeminiMultimodalLiveContextAggregatorPair from
an OpenAILLMContext. Constructor keyword arguments for both the user and
assistant aggregators can be provided.
Args:
context (OpenAILLMContext): The LLM context.
user_kwargs (Mapping[str, Any], optional): Additional keyword
arguments for the user context aggregator constructor. Defaults
to an empty mapping.
assistant_kwargs (Mapping[str, Any], optional): Additional keyword
arguments for the assistant context aggregator
constructor. Defaults to an empty mapping.
Returns:
GeminiMultimodalLiveContextAggregatorPair: A pair of context
aggregators, one for the user and one for the assistant,
encapsulated in an GeminiMultimodalLiveContextAggregatorPair.
"""
context.set_llm_adapter(self.get_llm_adapter())
GeminiMultimodalLiveContext.upgrade(context)
user = GeminiMultimodalLiveUserContextAggregator(context, **user_kwargs)
default_assistant_kwargs = {"expect_stripped_words": False}
default_assistant_kwargs.update(assistant_kwargs)
user = GeminiMultimodalLiveUserContextAggregator(context)
assistant = GeminiMultimodalLiveAssistantContextAggregator(
context, **default_assistant_kwargs
context, expect_stripped_words=assistant_expect_stripped_words
)
return GeminiMultimodalLiveContextAggregatorPair(_user=user, _assistant=assistant)

View File

@@ -136,7 +136,6 @@ class GladiaSTTService(STTService):
maximum_duration_without_endpointing: Optional[int] = 10
audio_enhancer: Optional[bool] = None
words_accurate_timestamps: Optional[bool] = None
speech_threshold: Optional[float] = 0.99
def __init__(
self,
@@ -149,6 +148,7 @@ class GladiaSTTService(STTService):
**kwargs,
):
super().__init__(sample_rate=sample_rate, **kwargs)
self._api_key = api_key
self._url = url
self._settings = {
@@ -166,7 +166,6 @@ class GladiaSTTService(STTService):
"maximum_duration_without_endpointing": params.maximum_duration_without_endpointing,
"pre_processing": {
"audio_enhancer": params.audio_enhancer,
"speech_threshold": params.speech_threshold,
},
"realtime_processing": {
"words_accurate_timestamps": params.words_accurate_timestamps,

View File

@@ -12,16 +12,12 @@ import os
import time
from google.api_core.exceptions import DeadlineExceeded
from openai import AsyncStream
from openai.types.chat import ChatCompletionChunk
from pipecat.adapters.services.gemini_adapter import GeminiLLMAdapter
# Suppress gRPC fork warnings
os.environ["GRPC_ENABLE_FORK_SUPPORT"] = "false"
from dataclasses import dataclass
from typing import Any, AsyncGenerator, Dict, List, Literal, Mapping, Optional, Union
from typing import Any, AsyncGenerator, Dict, List, Literal, Optional, Union
from loguru import logger
from PIL import Image
@@ -58,10 +54,7 @@ from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_services import ImageGenService, LLMService, STTService, TTSService
from pipecat.services.google.frames import LLMSearchResponseFrame
from pipecat.services.openai import (
BaseOpenAILLMService,
OpenAIAssistantContextAggregator,
OpenAILLMService,
OpenAIUnhandledFunctionException,
OpenAIUserContextAggregator,
)
from pipecat.transcriptions.language import Language
@@ -729,9 +722,7 @@ class GoogleLLMContext(OpenAILLMContext):
self.add_message(glm.Content(role="user", parts=parts))
def add_audio_frames_message(
self, *, audio_frames: list[AudioRawFrame], text: str = "Audio follows"
):
def add_audio_frames_message(self, *, audio_frames: list[AudioRawFrame], text: str = None):
if not audio_frames:
return
@@ -740,9 +731,8 @@ class GoogleLLMContext(OpenAILLMContext):
parts = []
data = b"".join(frame.audio for frame in audio_frames)
# NOTE(aleix): According to the docs only text or inline_data should be needed.
# (see https://cloud.google.com/vertex-ai/generative-ai/docs/model-reference/inference)
parts.append(glm.Part(text=text))
if text:
parts.append(glm.Part(text=text))
parts.append(
glm.Part(
inline_data=glm.Blob(
@@ -952,9 +942,6 @@ class GoogleLLMService(LLMService):
franca for all LLM services, so that it is easy to switch between different LLMs.
"""
# Overriding the default adapter to use the Gemini one.
adapter_class = GeminiLLMAdapter
class InputParams(BaseModel):
max_tokens: Optional[int] = Field(default=4096, ge=1)
temperature: Optional[float] = Field(default=None, ge=0.0, le=2.0)
@@ -1008,8 +995,8 @@ class GoogleLLMService(LLMService):
try:
logger.debug(
# f"{self}: Generating chat [{self._system_instruction}] | [{context.get_messages_for_logging()}]"
f"{self}: Generating chat [{context.get_messages_for_logging()}]"
# f"Generating chat: {self._system_instruction} | {context.get_messages_for_logging()}"
f"Generating chat: {context.get_messages_for_logging()}"
)
messages = context.messages
@@ -1185,153 +1172,17 @@ class GoogleLLMService(LLMService):
if context:
await self._process_context(context)
@staticmethod
def create_context_aggregator(
self,
context: OpenAILLMContext,
*,
user_kwargs: Mapping[str, Any] = {},
assistant_kwargs: Mapping[str, Any] = {},
context: OpenAILLMContext, *, assistant_expect_stripped_words: bool = True
) -> GoogleContextAggregatorPair:
"""Create an instance of GoogleContextAggregatorPair from an
OpenAILLMContext. Constructor keyword arguments for both the user and
assistant aggregators can be provided.
Args:
context (OpenAILLMContext): The LLM context.
user_kwargs (Mapping[str, Any], optional): Additional keyword
arguments for the user context aggregator constructor. Defaults
to an empty mapping.
assistant_kwargs (Mapping[str, Any], optional): Additional keyword
arguments for the assistant context aggregator
constructor. Defaults to an empty mapping.
Returns:
GoogleContextAggregatorPair: A pair of context aggregators, one for
the user and one for the assistant, encapsulated in an
GoogleContextAggregatorPair.
"""
context.set_llm_adapter(self.get_llm_adapter())
if isinstance(context, OpenAILLMContext):
context = GoogleLLMContext.upgrade_to_google(context)
user = GoogleUserContextAggregator(context, **user_kwargs)
assistant = GoogleAssistantContextAggregator(context, **assistant_kwargs)
return GoogleContextAggregatorPair(_user=user, _assistant=assistant)
class GoogleLLMOpenAIBetaService(OpenAILLMService):
"""This class implements inference with Google's AI LLM models using the OpenAI format.
Ref - https://ai.google.dev/gemini-api/docs/openai
"""
def __init__(
self,
*,
api_key: str,
base_url: str = "https://generativelanguage.googleapis.com/v1beta/openai/",
model: str = "gemini-2.0-flash",
**kwargs,
):
super().__init__(api_key=api_key, base_url=base_url, model=model, **kwargs)
async def _process_context(self, context: OpenAILLMContext):
functions_list = []
arguments_list = []
tool_id_list = []
func_idx = 0
function_name = ""
arguments = ""
tool_call_id = ""
await self.start_ttfb_metrics()
chunk_stream: AsyncStream[ChatCompletionChunk] = await self._stream_chat_completions(
context
user = GoogleUserContextAggregator(context)
assistant = GoogleAssistantContextAggregator(
context, expect_stripped_words=assistant_expect_stripped_words
)
async for chunk in chunk_stream:
if chunk.usage:
tokens = LLMTokenUsage(
prompt_tokens=chunk.usage.prompt_tokens,
completion_tokens=chunk.usage.completion_tokens,
total_tokens=chunk.usage.total_tokens,
)
await self.start_llm_usage_metrics(tokens)
if chunk.choices is None or len(chunk.choices) == 0:
continue
await self.stop_ttfb_metrics()
if not chunk.choices[0].delta:
continue
if chunk.choices[0].delta.tool_calls:
# We're streaming the LLM response to enable the fastest response times.
# For text, we just yield each chunk as we receive it and count on consumers
# to do whatever coalescing they need (eg. to pass full sentences to TTS)
#
# If the LLM is a function call, we'll do some coalescing here.
# If the response contains a function name, we'll yield a frame to tell consumers
# that they can start preparing to call the function with that name.
# We accumulate all the arguments for the rest of the streamed response, then when
# the response is done, we package up all the arguments and the function name and
# yield a frame containing the function name and the arguments.
logger.debug(f"Tool call: {chunk.choices[0].delta.tool_calls}")
tool_call = chunk.choices[0].delta.tool_calls[0]
if tool_call.index != func_idx:
functions_list.append(function_name)
arguments_list.append(arguments)
tool_id_list.append(tool_call_id)
function_name = ""
arguments = ""
tool_call_id = ""
func_idx += 1
if tool_call.function and tool_call.function.name:
function_name += tool_call.function.name
tool_call_id = tool_call.id
if tool_call.function and tool_call.function.arguments:
# Keep iterating through the response to collect all the argument fragments
arguments += tool_call.function.arguments
elif chunk.choices[0].delta.content:
await self.push_frame(LLMTextFrame(chunk.choices[0].delta.content))
# if we got a function name and arguments, check to see if it's a function with
# a registered handler. If so, run the registered callback, save the result to
# the context, and re-prompt to get a chat answer. If we don't have a registered
# handler, raise an exception.
if function_name and arguments:
# added to the list as last function name and arguments not added to the list
functions_list.append(function_name)
arguments_list.append(arguments)
tool_id_list.append(tool_call_id)
logger.debug(
f"Function list: {functions_list}, Arguments list: {arguments_list}, Tool ID list: {tool_id_list}"
)
for index, (function_name, arguments, tool_id) in enumerate(
zip(functions_list, arguments_list, tool_id_list), start=1
):
if function_name == "":
# TODO: Remove the _process_context method once Google resolves the bug
# where the index is incorrectly set to None instead of returning the actual index,
# which currently results in an empty function name('').
continue
if self.has_function(function_name):
run_llm = False
arguments = json.loads(arguments)
await self.call_function(
context=context,
function_name=function_name,
arguments=arguments,
tool_call_id=tool_id,
run_llm=run_llm,
)
else:
raise OpenAIUnhandledFunctionException(
f"The LLM tried to call a function named '{function_name}', but there isn't a callback registered for that function."
)
return GoogleContextAggregatorPair(_user=user, _assistant=assistant)
class GoogleTTSService(TTSService):
@@ -1443,7 +1294,7 @@ class GoogleTTSService(TTSService):
return ssml
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"{self}: Generating TTS [{text}]")
logger.debug(f"Generating TTS: [{text}]")
try:
await self.start_ttfb_metrics()

View File

@@ -7,7 +7,7 @@
import json
from dataclasses import dataclass
from typing import Any, Mapping, Optional
from typing import Optional
from loguru import logger
@@ -206,34 +206,12 @@ class GrokLLMService(OpenAILLMService):
if tokens.completion_tokens > self._completion_tokens:
self._completion_tokens = tokens.completion_tokens
@staticmethod
def create_context_aggregator(
self,
context: OpenAILLMContext,
*,
user_kwargs: Mapping[str, Any] = {},
assistant_kwargs: Mapping[str, Any] = {},
context: OpenAILLMContext, *, assistant_expect_stripped_words: bool = True
) -> GrokContextAggregatorPair:
"""Create an instance of GrokContextAggregatorPair from an
OpenAILLMContext. Constructor keyword arguments for both the user and
assistant aggregators can be provided.
Args:
context (OpenAILLMContext): The LLM context.
user_kwargs (Mapping[str, Any], optional): Additional keyword
arguments for the user context aggregator constructor. Defaults
to an empty mapping.
assistant_kwargs (Mapping[str, Any], optional): Additional keyword
arguments for the assistant context aggregator
constructor. Defaults to an empty mapping.
Returns:
GrokContextAggregatorPair: A pair of context aggregators, one for
the user and one for the assistant, encapsulated in an
GrokContextAggregatorPair.
"""
context.set_llm_adapter(self.get_llm_adapter())
user = OpenAIUserContextAggregator(context, **user_kwargs)
assistant = GrokAssistantContextAggregator(context, **assistant_kwargs)
user = OpenAIUserContextAggregator(context)
assistant = GrokAssistantContextAggregator(
context, expect_stripped_words=assistant_expect_stripped_words
)
return GrokContextAggregatorPair(_user=user, _assistant=assistant)

View File

@@ -196,7 +196,7 @@ class LmntTTSService(InterruptibleTTSService):
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
"""Generate TTS audio from text."""
logger.debug(f"{self}: Generating TTS [{text}]")
logger.debug(f"Generating TTS: [{text}]")
try:
if not self._websocket:

View File

@@ -8,7 +8,7 @@ import base64
import io
import json
from dataclasses import dataclass
from typing import Any, AsyncGenerator, Dict, List, Literal, Mapping, Optional
from typing import Any, AsyncGenerator, Dict, List, Literal, Optional
import aiohttp
import httpx
@@ -178,7 +178,7 @@ class BaseOpenAILLMService(LLMService):
async def _stream_chat_completions(
self, context: OpenAILLMContext
) -> AsyncStream[ChatCompletionChunk]:
logger.debug(f"{self}: Generating chat [{context.get_messages_for_logging()}]")
logger.debug(f"Generating chat: {context.get_messages_for_logging()}")
messages: List[ChatCompletionMessageParam] = context.get_messages()
@@ -343,35 +343,14 @@ class OpenAILLMService(BaseOpenAILLMService):
):
super().__init__(model=model, params=params, **kwargs)
@staticmethod
def create_context_aggregator(
self,
context: OpenAILLMContext,
*,
user_kwargs: Mapping[str, Any] = {},
assistant_kwargs: Mapping[str, Any] = {},
context: OpenAILLMContext, *, assistant_expect_stripped_words: bool = True
) -> OpenAIContextAggregatorPair:
"""Create an instance of OpenAIContextAggregatorPair from an
OpenAILLMContext. Constructor keyword arguments for both the user and
assistant aggregators can be provided.
Args:
context (OpenAILLMContext): The LLM context.
user_kwargs (Mapping[str, Any], optional): Additional keyword
arguments for the user context aggregator constructor. Defaults
to an empty mapping.
assistant_kwargs (Mapping[str, Any], optional): Additional keyword
arguments for the assistant context aggregator
constructor. Defaults to an empty mapping.
Returns:
OpenAIContextAggregatorPair: A pair of context aggregators, one for
the user and one for the assistant, encapsulated in an
OpenAIContextAggregatorPair.
"""
context.set_llm_adapter(self.get_llm_adapter())
user = OpenAIUserContextAggregator(context, **user_kwargs)
assistant = OpenAIAssistantContextAggregator(context, **assistant_kwargs)
user = OpenAIUserContextAggregator(context)
assistant = OpenAIAssistantContextAggregator(
context, expect_stripped_words=assistant_expect_stripped_words
)
return OpenAIContextAggregatorPair(_user=user, _assistant=assistant)
@@ -529,7 +508,7 @@ class OpenAITTSService(TTSService):
)
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"{self}: Generating TTS [{text}]")
logger.debug(f"Generating TTS: [{text}]")
try:
await self.start_ttfb_metrics()
@@ -551,10 +530,8 @@ class OpenAITTSService(TTSService):
await self.start_tts_usage_metrics(text)
CHUNK_SIZE = 1024
yield TTSStartedFrame()
async for chunk in r.iter_bytes(CHUNK_SIZE):
async for chunk in r.iter_bytes(8192):
if len(chunk) > 0:
await self.stop_ttfb_metrics()
frame = TTSAudioRawFrame(chunk, self.sample_rate, 1)

View File

@@ -1,3 +1,2 @@
from .azure import AzureRealtimeBetaLLMService
from .events import InputAudioTranscription, SessionProperties, TurnDetection
from .openai import OpenAIRealtimeBetaLLMService

View File

@@ -1,64 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from loguru import logger
from .openai import OpenAIRealtimeBetaLLMService
try:
import websockets
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
"In order to use OpenAI, you need to `pip install pipecat-ai[openai]`. Also, set `OPENAI_API_KEY` environment variable."
)
raise Exception(f"Missing module: {e}")
class AzureRealtimeBetaLLMService(OpenAIRealtimeBetaLLMService):
"""Subclass of OpenAI Realtime API Service with adjustments for Azure's wss connection."""
def __init__(
self,
*,
api_key: str,
base_url: str,
**kwargs,
):
"""Constructor takes the same arguments as the parent class, OpenAIRealtimeBetaLLMService.
Note that the following are required arguments:
api_key: The API key for the Azure OpenAI service.
base_url: The base URL for the Azure OpenAI service.
base_url should be set to the full Azure endpoint URL including the api-version and the deployment name. For example,
wss://my-project.openai.azure.com/openai/realtime?api-version=2024-10-01-preview&deployment=my-realtime-deployment
"""
super().__init__(base_url=base_url, api_key=api_key, **kwargs)
self.api_key = api_key
self.base_url = base_url
async def _connect(self):
try:
if self._websocket:
# Here we assume that if we have a websocket, we are connected. We
# handle disconnections in the send/recv code paths.
return
logger.info(f"Connecting to {self.base_url}, api key: {self.api_key}")
self._websocket = await websockets.connect(
uri=self.base_url,
extra_headers={
"api-key": self.api_key,
},
)
self._receive_task = self.create_task(self._receive_task_handler())
except Exception as e:
logger.error(f"{self} initialization error: {e}")
self._websocket = None

View File

@@ -4,16 +4,14 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import base64
import json
import time
from dataclasses import dataclass
from typing import Any, Mapping
from loguru import logger
from pipecat.adapters.services.open_ai_realtime_adapter import OpenAIRealtimeLLMAdapter
try:
import websockets
except ModuleNotFoundError as e:
@@ -78,9 +76,6 @@ class OpenAIUnhandledFunctionException(Exception):
class OpenAIRealtimeBetaLLMService(LLMService):
# Overriding the default adapter to use the OpenAIRealtimeLLMAdapter one.
adapter_class = OpenAIRealtimeLLMAdapter
def __init__(
self,
*,
@@ -576,37 +571,11 @@ class OpenAIRealtimeBetaLLMService(LLMService):
await self.send_client_event(events.InputAudioBufferAppendEvent(audio=payload))
def create_context_aggregator(
self,
context: OpenAILLMContext,
*,
user_kwargs: Mapping[str, Any] = {},
assistant_kwargs: Mapping[str, Any] = {},
self, context: OpenAILLMContext, *, assistant_expect_stripped_words: bool = False
) -> OpenAIContextAggregatorPair:
"""Create an instance of OpenAIContextAggregatorPair from an
OpenAILLMContext. Constructor keyword arguments for both the user and
assistant aggregators can be provided.
Args:
context (OpenAILLMContext): The LLM context.
user_kwargs (Mapping[str, Any], optional): Additional keyword
arguments for the user context aggregator constructor. Defaults
to an empty mapping.
assistant_kwargs (Mapping[str, Any], optional): Additional keyword
arguments for the assistant context aggregator
constructor. Defaults to an empty mapping.
Returns:
OpenAIContextAggregatorPair: A pair of context aggregators, one for
the user and one for the assistant, encapsulated in an
OpenAIContextAggregatorPair.
"""
context.set_llm_adapter(self.get_llm_adapter())
OpenAIRealtimeLLMContext.upgrade_to_realtime(context)
user = OpenAIRealtimeUserContextAggregator(context, **user_kwargs)
default_assistant_kwargs = {"expect_stripped_words": False}
default_assistant_kwargs.update(assistant_kwargs)
assistant = OpenAIRealtimeAssistantContextAggregator(context, **default_assistant_kwargs)
user = OpenAIRealtimeUserContextAggregator(context)
assistant = OpenAIRealtimeAssistantContextAggregator(
context, expect_stripped_words=assistant_expect_stripped_words
)
return OpenAIContextAggregatorPair(_user=user, _assistant=assistant)

View File

@@ -269,7 +269,7 @@ class PlayHTTTSService(InterruptibleTTSService):
logger.error(f"Invalid JSON message: {message}")
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"{self}: Generating TTS [{text}]")
logger.debug(f"Generating TTS: [{text}]")
try:
# Reconnect if the websocket is closed
@@ -323,8 +323,7 @@ class PlayHTHttpTTSService(TTSService):
api_key: str,
user_id: str,
voice_url: str,
voice_engine: str = "Play3.0-mini",
protocol: str = "http", # Options: http, ws
voice_engine: str = "Play3.0-mini-http", # Options: Play3.0-mini-http, Play3.0-mini-ws
sample_rate: Optional[int] = None,
params: InputParams = InputParams(),
**kwargs,
@@ -338,24 +337,12 @@ class PlayHTHttpTTSService(TTSService):
user_id=self._user_id,
api_key=self._api_key,
)
# Check if voice_engine contains protocol information (backward compatibility)
if "-http" in voice_engine:
# Extract the base engine name
voice_engine = voice_engine.replace("-http", "")
protocol = "http"
elif "-ws" in voice_engine:
# Extract the base engine name
voice_engine = voice_engine.replace("-ws", "")
protocol = "ws"
self._settings = {
"language": self.language_to_service_language(params.language)
if params.language
else "english",
"format": Format.FORMAT_WAV,
"voice_engine": voice_engine,
"protocol": protocol,
"speed": params.speed,
"seed": params.seed,
}
@@ -392,26 +379,23 @@ class PlayHTHttpTTSService(TTSService):
return language_to_playht_language(language)
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"{self}: Generating TTS [{text}]")
logger.debug(f"Generating TTS: [{text}]")
try:
options = self._create_options()
b = bytearray()
in_header = True
await self.start_ttfb_metrics()
playht_gen = self._client.tts(
text,
voice_engine=self._settings["voice_engine"],
protocol=self._settings["protocol"],
options=options,
text, voice_engine=self._settings["voice_engine"], options=options
)
await self.start_tts_usage_metrics(text)
yield TTSStartedFrame()
b = bytearray()
in_header = True
async for chunk in playht_gen:
# skip the RIFF header.
if in_header:
@@ -426,10 +410,11 @@ class PlayHTHttpTTSService(TTSService):
fh.read(size)
(data, size) = struct.unpack("<4sI", fh.read(8))
in_header = False
elif len(chunk) > 0:
await self.stop_ttfb_metrics()
frame = TTSAudioRawFrame(chunk, self.sample_rate, 1)
yield frame
else:
if len(chunk):
await self.stop_ttfb_metrics()
frame = TTSAudioRawFrame(chunk, self.sample_rate, 1)
yield frame
except Exception as e:
logger.error(f"{self} error generating TTS: {e}")
finally:

View File

@@ -309,7 +309,7 @@ class RimeTTSService(AudioContextWordTTSService):
Yields:
Frames containing audio data and timing information.
"""
logger.debug(f"{self}: Generating TTS [{text}]")
logger.debug(f"Generating TTS: [{text}]")
try:
if not self._websocket:
await self._connect()
@@ -376,7 +376,7 @@ class RimeHttpTTSService(TTSService):
return True
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"{self}: Generating TTS [{text}]")
logger.debug(f"Generating TTS: [{text}]")
headers = {
"Accept": "audio/pcm",
@@ -407,10 +407,10 @@ class RimeHttpTTSService(TTSService):
yield TTSStartedFrame()
# Process the streaming response
CHUNK_SIZE = 1024
chunk_size = 8192
async for chunk in response.content.iter_chunked(CHUNK_SIZE):
if len(chunk) > 0:
async for chunk in response.content.iter_chunked(chunk_size):
if chunk:
await self.stop_ttfb_metrics()
frame = TTSAudioRawFrame(chunk, self.sample_rate, 1)
yield frame

View File

@@ -101,7 +101,7 @@ class FastPitchTTSService(TTSService):
await self.start_ttfb_metrics()
yield TTSStartedFrame()
logger.debug(f"{self}: Generating TTS [{text}]")
logger.debug(f"Generating TTS: [{text}]")
try:
queue = asyncio.Queue()

View File

@@ -118,7 +118,7 @@ class XTTSService(TTSService):
self._studio_speakers = await r.json()
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"{self}: Generating TTS [{text}]")
logger.debug(f"Generating TTS: [{text}]")
if not self._studio_speakers:
logger.error(f"{self} no studio speakers available")
@@ -150,10 +150,8 @@ class XTTSService(TTSService):
yield TTSStartedFrame()
CHUNK_SIZE = 1024
buffer = bytearray()
async for chunk in r.content.iter_chunked(CHUNK_SIZE):
async for chunk in r.content.iter_chunked(1024):
if len(chunk) > 0:
await self.stop_ttfb_metrics()
# Append new chunk to the buffer.

View File

@@ -10,7 +10,6 @@ from typing import Optional
from loguru import logger
from pipecat.audio.turn.base_turn_analyzer import BaseEndOfTurnAnalyzer, EndOfTurnState
from pipecat.audio.vad.vad_analyzer import VADAnalyzer, VADState
from pipecat.frames.frames import (
BotInterruptionFrame,
@@ -25,7 +24,6 @@ from pipecat.frames.frames import (
StartInterruptionFrame,
StopInterruptionFrame,
SystemFrame,
UserEndOfTurnFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
VADParamsUpdateFrame,
@@ -66,19 +64,12 @@ class BaseInputTransport(FrameProcessor):
def vad_analyzer(self) -> Optional[VADAnalyzer]:
return self._params.vad_analyzer
@property
def end_of_turn_analyzer(self) -> Optional[BaseEndOfTurnAnalyzer]:
return self._params.end_of_turn_analyzer
async def start(self, frame: StartFrame):
self._sample_rate = self._params.audio_in_sample_rate or frame.audio_in_sample_rate
# Configure VAD analyzer.
if self._params.vad_enabled and self._params.vad_analyzer:
self._params.vad_analyzer.set_sample_rate(self._sample_rate)
# Configure End of turn analyzer.
if self._params.end_of_turn_analyzer:
self._params.end_of_turn_analyzer.set_sample_rate(self._sample_rate)
# Start audio filter.
if self._params.audio_in_filter:
await self._params.audio_in_filter.start(self._sample_rate)
@@ -207,25 +198,8 @@ class BaseInputTransport(FrameProcessor):
vad_state = new_vad_state
return vad_state
async def _end_of_turn_analyze(self, audio_frame: InputAudioRawFrame) -> EndOfTurnState:
state = EndOfTurnState.INCOMPLETE
if self.end_of_turn_analyzer:
state = await self.get_event_loop().run_in_executor(
self._executor, self.end_of_turn_analyzer.analyze_audio, audio_frame.audio
)
return state
async def _handle_end_of_turn(
self, audio_frame: InputAudioRawFrame, end_of_turn_state: EndOfTurnState
):
new_eot_state = await self._end_of_turn_analyze(audio_frame)
if new_eot_state != end_of_turn_state:
await self.push_frame(UserEndOfTurnFrame())
return new_eot_state
async def _audio_task_handler(self):
vad_state: VADState = VADState.QUIET
end_of_turn_state: EndOfTurnState = EndOfTurnState.INCOMPLETE
while True:
frame: InputAudioRawFrame = await self._audio_in_queue.get()
@@ -241,9 +215,6 @@ class BaseInputTransport(FrameProcessor):
vad_state = await self._handle_vad(frame, vad_state)
audio_passthrough = self._params.vad_audio_passthrough
if self._params.end_of_turn_analyzer:
end_of_turn_state = await self._handle_end_of_turn(frame, end_of_turn_state)
# Push audio downstream if passthrough.
if audio_passthrough:
await self.push_frame(frame)

View File

@@ -232,9 +232,6 @@ class BaseOutputTransport(FrameProcessor):
await self.push_frame(BotStoppedSpeakingFrame())
await self.push_frame(BotStoppedSpeakingFrame(), FrameDirection.UPSTREAM)
self._bot_speaking = False
# Clean audio buffer (there could be tiny left overs if not multiple
# to our output chunk size).
self._audio_buffer = bytearray()
#
# Sink tasks

View File

@@ -4,17 +4,18 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
from abc import abstractmethod
import inspect
from abc import ABC, abstractmethod
from typing import Optional
from loguru import logger
from pydantic import BaseModel, ConfigDict
from pipecat.audio.filters.base_audio_filter import BaseAudioFilter
from pipecat.audio.mixers.base_audio_mixer import BaseAudioMixer
from pipecat.audio.turn.base_turn_analyzer import BaseEndOfTurnAnalyzer
from pipecat.audio.vad.vad_analyzer import VADAnalyzer
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.utils.base_object import BaseObject
from pipecat.utils.utils import obj_count, obj_id
class TransportParams(BaseModel):
@@ -40,10 +41,9 @@ class TransportParams(BaseModel):
vad_enabled: bool = False
vad_audio_passthrough: bool = False
vad_analyzer: Optional[VADAnalyzer] = None
end_of_turn_analyzer: Optional[BaseEndOfTurnAnalyzer] = None
class BaseTransport(BaseObject):
class BaseTransport(ABC):
def __init__(
self,
*,
@@ -51,14 +51,54 @@ class BaseTransport(BaseObject):
input_name: Optional[str] = None,
output_name: Optional[str] = None,
):
super().__init__(name=name)
self._id: int = obj_id()
self._name = name or f"{self.__class__.__name__}#{obj_count(self)}"
self._input_name = input_name
self._output_name = output_name
self._event_handlers: dict = {}
@property
def id(self) -> int:
return self._id
@property
def name(self) -> str:
return self._name
@abstractmethod
def input(self) -> FrameProcessor:
pass
raise NotImplementedError
@abstractmethod
def output(self) -> FrameProcessor:
pass
raise NotImplementedError
def event_handler(self, event_name: str):
def decorator(handler):
self.add_event_handler(event_name, handler)
return handler
return decorator
def add_event_handler(self, event_name: str, handler):
if event_name not in self._event_handlers:
raise Exception(f"Event handler {event_name} not registered")
self._event_handlers[event_name].append(handler)
def _register_event_handler(self, event_name: str):
if event_name in self._event_handlers:
raise Exception(f"Event handler {event_name} already registered")
self._event_handlers[event_name] = []
async def _call_event_handler(self, event_name: str, *args, **kwargs):
try:
for handler in self._event_handlers[event_name]:
if inspect.iscoroutinefunction(handler):
await handler(self, *args, **kwargs)
else:
handler(self, *args, **kwargs)
except Exception as e:
logger.exception(f"Exception in event handler {event_name}: {e}")
def __str__(self):
return self.name

View File

@@ -195,10 +195,6 @@ class DailyMeetingTokenProperties(BaseModel):
default=None,
description="Start cloud recording when the user joins the room. This can be used to always record and archive meetings, for example in a customer support context.",
)
permissions: Optional[dict] = Field(
default=None,
description="Specifies the initial default permissions for a non-meeting-owner participant joining a call.",
)
class DailyMeetingTokenParams(BaseModel):

View File

@@ -1,58 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import inspect
from abc import ABC
from typing import Optional
from loguru import logger
from pipecat.utils.utils import obj_count, obj_id
class BaseObject(ABC):
def __init__(self, *, name: Optional[str] = None):
self._id: int = obj_id()
self._name = name or f"{self.__class__.__name__}#{obj_count(self)}"
self._event_handlers: dict = {}
@property
def id(self) -> int:
return self._id
@property
def name(self) -> str:
return self._name
def event_handler(self, event_name: str):
def decorator(handler):
self.add_event_handler(event_name, handler)
return handler
return decorator
def add_event_handler(self, event_name: str, handler):
if event_name not in self._event_handlers:
raise Exception(f"Event handler {event_name} not registered")
self._event_handlers[event_name].append(handler)
def _register_event_handler(self, event_name: str):
if event_name in self._event_handlers:
raise Exception(f"Event handler {event_name} already registered")
self._event_handlers[event_name] = []
async def _call_event_handler(self, event_name: str, *args, **kwargs):
try:
for handler in self._event_handlers[event_name]:
if inspect.iscoroutinefunction(handler):
await handler(self, *args, **kwargs)
else:
handler(self, *args, **kwargs)
except Exception as e:
logger.exception(f"Exception in event handler {event_name}: {e}")
def __str__(self):
return self.name

View File

@@ -8,8 +8,6 @@ class TestException(Exception):
class TestFrameProcessor(FrameProcessor):
__test__ = False # Prevents pytest from collecting this class as a test
def __init__(self, test_frames):
self.test_frames = test_frames
self._list_counter = 0

View File

@@ -1,96 +0,0 @@
import os
from unittest.mock import AsyncMock
import pytest
from dotenv import load_dotenv
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.frames.frames import (
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMTextFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_services import LLMService
from pipecat.services.anthropic import AnthropicLLMService
from pipecat.services.google import GoogleLLMService
from pipecat.services.openai import OpenAILLMContext, OpenAILLMContextFrame, OpenAILLMService
from pipecat.utils.test_frame_processor import TestFrameProcessor
load_dotenv(override=True)
def standard_tools() -> ToolsSchema:
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location"],
)
tools_def = ToolsSchema(standard_tools=[weather_function])
return tools_def
async def _test_llm_function_calling(llm: LLMService):
# Create an AsyncMock for the function
mock_fetch_weather = AsyncMock()
llm.register_function(None, mock_fetch_weather)
t = TestFrameProcessor([LLMFullResponseStartFrame, LLMTextFrame, LLMFullResponseEndFrame])
llm.link(t)
messages = [
{
"role": "system",
"content": "You are a helpful assistant who can report the weather in any location in the universe. Respond concisely. Your response will be turned into speech so use only simple words and punctuation.",
},
{"role": "user", "content": " How is the weather today in San Francisco, California?"},
]
context = OpenAILLMContext(messages, standard_tools())
# This is done by default inside the create_context_aggregator
context.set_llm_adapter(llm.get_llm_adapter())
frame = OpenAILLMContextFrame(context)
# This will fail if an exception is raised
await llm.process_frame(frame, FrameDirection.DOWNSTREAM)
# Assert that the mock function was called
mock_fetch_weather.assert_called_once()
@pytest.mark.skipif(os.getenv("OPENAI_API_KEY") is None, reason="OPENAI_API_KEY is not set")
@pytest.mark.asyncio
async def test_unified_function_calling_openai():
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
# This will fail if an exception is raised
await _test_llm_function_calling(llm)
@pytest.mark.skipif(os.getenv("GOOGLE_API_KEY") is None, reason="GOOGLE_API_KEY is not set")
@pytest.mark.asyncio
async def test_unified_function_calling_gemini():
llm = GoogleLLMService(api_key=os.getenv("GOOGLE_API_KEY"), model="gemini-2.0-flash-001")
# This will fail if an exception is raised
await _test_llm_function_calling(llm)
@pytest.mark.skipif(os.getenv("ANTHROPIC_API_KEY") is None, reason="ANTHROPIC_API_KEY is not set")
@pytest.mark.asyncio
async def test_unified_function_calling_anthropic():
llm = AnthropicLLMService(
api_key=os.getenv("ANTHROPIC_API_KEY"), model="claude-3-5-sonnet-20240620"
)
# This will fail if an exception is raised
await _test_llm_function_calling(llm)

View File

@@ -1,176 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import unittest
from openai.types.chat import ChatCompletionToolParam
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import AdapterType, ToolsSchema
from pipecat.adapters.services.anthropic_adapter import AnthropicLLMAdapter
from pipecat.adapters.services.gemini_adapter import GeminiLLMAdapter
from pipecat.adapters.services.open_ai_adapter import OpenAILLMAdapter
from pipecat.adapters.services.open_ai_realtime_adapter import OpenAIRealtimeLLMAdapter
class TestFunctionAdapters(unittest.TestCase):
def setUp(self) -> None:
"""Sets up a common tools schema for all tests."""
function_def = FunctionSchema(
name="get_weather",
description="Get the weather in a given location",
properties={
"location": {"type": "string", "description": "The city, e.g. San Francisco"},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use.",
},
},
required=["location", "format"],
)
self.tools_def = ToolsSchema(standard_tools=[function_def])
def test_openai_adapter(self):
"""Test OpenAI adapter format transformation."""
expected = [
ChatCompletionToolParam(
type="function",
function={
"name": "get_weather",
"description": "Get the weather in a given location",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city, e.g. San Francisco",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use.",
},
},
"required": ["location", "format"],
},
},
)
]
assert OpenAILLMAdapter().to_provider_tools_format(self.tools_def) == expected
def test_anthropic_adapter(self):
"""Test Anthropic adapter format transformation."""
expected = [
{
"name": "get_weather",
"description": "Get the weather in a given location",
"input_schema": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city, e.g. San Francisco",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use.",
},
},
"required": ["location", "format"],
},
}
]
assert AnthropicLLMAdapter().to_provider_tools_format(self.tools_def) == expected
def test_gemini_adapter(self):
"""Test Gemini adapter format transformation."""
expected = [
{
"function_declarations": [
{
"name": "get_weather",
"description": "Get the weather in a given location",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city, e.g. San Francisco",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use.",
},
},
"required": ["location", "format"],
},
}
]
}
]
assert GeminiLLMAdapter().to_provider_tools_format(self.tools_def) == expected
def test_openai_realtime_adapter(self):
"""Test Anthropic adapter format transformation."""
expected = [
{
"type": "function",
"name": "get_weather",
"description": "Get the weather in a given location",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city, e.g. San Francisco",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use.",
},
},
"required": ["location", "format"],
},
}
]
assert OpenAIRealtimeLLMAdapter().to_provider_tools_format(self.tools_def) == expected
def test_gemini_adapter_with_custom_tools(self):
"""Test Gemini adapter format transformation."""
search_tool = {"google_search": {}}
expected = [
{
"function_declarations": [
{
"name": "get_weather",
"description": "Get the weather in a given location",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city, e.g. San Francisco",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use.",
},
},
"required": ["location", "format"],
},
}
]
},
search_tool,
]
tools_def = self.tools_def
tools_def.custom_tools = {AdapterType.GEMINI: [search_tool]}
assert GeminiLLMAdapter().to_provider_tools_format(tools_def) == expected

View File

@@ -1,136 +0,0 @@
#
# Copyright (c) 2024-2025 Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import unittest
from pipecat.frames.frames import (
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMTextFrame,
StartInterruptionFrame,
)
from pipecat.processors.aggregators.llm_response import LLMFullResponseAggregator
from pipecat.tests.utils import SleepFrame, run_test
class TestLLMFullResponseAggregator(unittest.IsolatedAsyncioTestCase):
async def test_empty(self):
completion_ok = False
aggregator = LLMFullResponseAggregator()
@aggregator.event_handler("on_completion")
async def on_completion(aggregator, completion, completed):
nonlocal completion_ok
completion_ok = completion == "" and completed
frames_to_send = [LLMFullResponseStartFrame(), LLMFullResponseEndFrame()]
expected_down_frames = [LLMFullResponseStartFrame, LLMFullResponseEndFrame]
await run_test(
aggregator,
frames_to_send=frames_to_send,
expected_down_frames=expected_down_frames,
)
assert completion_ok
async def test_simple(self):
completion_ok = False
aggregator = LLMFullResponseAggregator()
@aggregator.event_handler("on_completion")
async def on_completion(aggregator, completion, completed):
nonlocal completion_ok
completion_ok = completion == "Hello from Pipecat!" and completed
frames_to_send = [
LLMFullResponseStartFrame(),
LLMTextFrame("Hello from Pipecat!"),
LLMFullResponseEndFrame(),
]
expected_down_frames = [LLMFullResponseStartFrame, LLMTextFrame, LLMFullResponseEndFrame]
await run_test(
aggregator,
frames_to_send=frames_to_send,
expected_down_frames=expected_down_frames,
)
assert completion_ok
async def test_multiple(self):
completion_ok = False
aggregator = LLMFullResponseAggregator()
@aggregator.event_handler("on_completion")
async def on_completion(aggregator, completion, completed):
nonlocal completion_ok
completion_ok = completion == "Hello from Pipecat!" and completed
frames_to_send = [
LLMFullResponseStartFrame(),
LLMTextFrame("Hello "),
LLMTextFrame("from "),
LLMTextFrame("Pipecat!"),
LLMFullResponseEndFrame(),
]
expected_down_frames = [
LLMFullResponseStartFrame,
LLMTextFrame,
LLMTextFrame,
LLMTextFrame,
LLMFullResponseEndFrame,
]
await run_test(
aggregator,
frames_to_send=frames_to_send,
expected_down_frames=expected_down_frames,
)
assert completion_ok
async def test_interruption(self):
completion_ok = True
completion_result = [("Hello ", False), ("Hello there!", True)]
completion_index = 0
aggregator = LLMFullResponseAggregator()
@aggregator.event_handler("on_completion")
async def on_completion(aggregator, completion, completed):
nonlocal completion_result, completion_index, completion_ok
(completion_expected, completion_completed) = completion_result[completion_index]
completion_ok = (
completion_ok
and completion == completion_expected
and completed == completion_completed
)
completion_index += 1
frames_to_send = [
LLMFullResponseStartFrame(),
LLMTextFrame("Hello "),
SleepFrame(),
StartInterruptionFrame(),
LLMFullResponseStartFrame(),
LLMTextFrame("Hello "),
LLMTextFrame("there!"),
LLMFullResponseEndFrame(),
]
expected_down_frames = [
LLMFullResponseStartFrame,
LLMTextFrame,
StartInterruptionFrame,
LLMFullResponseStartFrame,
LLMTextFrame,
LLMTextFrame,
LLMFullResponseEndFrame,
]
await run_test(
aggregator,
frames_to_send=frames_to_send,
expected_down_frames=expected_down_frames,
)
assert completion_ok