Compare commits

..

1 Commits

Author SHA1 Message Date
James Hush
81d066074c Fix lint 2025-04-14 11:48:55 +08:00
130 changed files with 457 additions and 1864 deletions

View File

@@ -5,72 +5,6 @@ All notable changes to **Pipecat** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
### Added
- Added support for Smart Turn Detection via the `turn_analyzer` transport
parameter. You can now choose between `SmartTurnAnalyzer()` for remote
inference or `LocalCoreMLSmartTurnAnalyzer()` for on-device inference using
Core ML.
- `DeepgramTTSService` accepts `base_url` argument again, allowing you to
connect to an on-prem service.
- Added `LLMUserAggregatorParams` and `LLMAssistantAggregatorParams` which allow
you to control aggregator settings. You can now pass these arguments when
creating aggregator pairs with `create_context_aggregator()`.
- Added `previous_text` context support to ElevenLabsHttpTTSService, improving
speech consistency across sentences within an LLM response.
- Added word/timestamp pairs to `ElevenLabsHttpTTSService`.
- It is now possible to disable `SoundfileMixer` when created. You can then use
`MixerEnableFrame` to dynamically enable it when necessary.
- Added `on_client_connected` and `on_client_disconnected` event handlers to
the `DailyTransport` class. These handlers map to the same underlying Daily
events as `on_participant_joined` and `on_participant_left`, respectively.
This makes it easier to write a single bot pipeline that can also use other
transports like `SmallWebRTCTransport` and `FastAPIWebsocketTransport`.
### Changed
- Daily's REST helpers now include an `eject_at_token_exp` param, which ejects
the user when their token expires. This new parameter defaults to False.
Also, the default value for `enable_prejoin_ui` changed to False and
`eject_at_room_exp` changed to False.
- `OpenAILLMService` and `OpenPipeLLMService` now use `gpt-4.1` as their
default model.
- `SoundfileMixer` constructor arguments need to be keywords.
### Deprecated
- `DeepgramSTTService` parameter `url` is now deprecated, use `base_url`
instead.
### Removed
- Parameters `user_kwargs` and `assistant_kwargs` when creating a context
aggregator pair using `create_context_aggregator()` have been removed. Use
`user_params` and `assistant_params` instead.
### Fixed
- Fixed an issue that would cause TTS websocket-based services to not cleanup
resources properly when disconnecting.
- Fixed a `TavusVideoService` issue that was causing audio choppiness.
- Fixed an issue in `SmallWebRTCTransport` where an error was thrown if the
client did not create a video transceiver.
- Fixed an issue where LLM input parameters were not working and applied correctly in `GoogleVertexLLMService`, causing
unexpected behavior during inference.
## [0.0.63] - 2025-04-11
### Added

View File

@@ -92,8 +92,4 @@ ASSEMBLYAI_API_KEY=...
OPENROUTER_API_KEY=...
# Piper
PIPER_BASE_URL=...
# Smart turn
LOCAL_SMART_TURN_MODEL_PATH=
REMOTE_SMART_TURN_URL=
PIPER_BASE_URL=...

View File

@@ -72,7 +72,7 @@ async def main():
# voice_id="gD1IexrzCvsXPHUuT0s3",
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{

View File

@@ -95,7 +95,7 @@ async def main():
# voice_id="gD1IexrzCvsXPHUuT0s3",
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{

View File

@@ -53,7 +53,7 @@ async def main(room_url: str, token: str):
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{

View File

@@ -43,7 +43,7 @@ async def main(room_url: str, token: str):
api_key=os.getenv("CARTESIA_API_KEY", ""), voice_id="71a7ad14-091c-4e8e-a314-022ece01c121"
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{

View File

@@ -141,7 +141,6 @@ async def dial(request: RoomRequest, raw_request: Request):
"display_name": request.From,
"sip_mode": "dial-in",
"num_endpoints": 2 if request.call_transfer is not None else 1,
"codecs": {"audio": ["OPUS"]},
}
daily_room_properties["sip"] = sip_config

View File

@@ -103,7 +103,6 @@ export default async function handler(req, res) {
display_name: From,
sip_mode: 'dial-in',
num_endpoints: call_transfer !== null ? 2 : 1,
codecs: {"audio": ["OPUS"]},
};
daily_room_properties.sip = sip_config;
}
@@ -173,4 +172,4 @@ export const config = {
sizeLimit: '1mb',
},
},
};
};

View File

@@ -61,7 +61,7 @@ async def main(room_url: str, token: str):
api_key=os.getenv("CARTESIA_API_KEY"), voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22"
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{

View File

@@ -9,11 +9,10 @@ import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.frames.frames import EndFrame, TranscriptionFrame, TTSSpeakFrame
from pipecat.frames.frames import EndFrame, TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport

View File

@@ -38,7 +38,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{

View File

@@ -85,7 +85,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
# Create an HTTP session for API calls
async with aiohttp.ClientSession() as session:
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
tts = CartesiaHttpTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),

View File

@@ -93,7 +93,7 @@ async def main():
self.frame = frame
await self.push_frame(frame, direction)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
tts = CartesiaHttpTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),

View File

@@ -10,7 +10,7 @@ from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import Frame, MetricsFrame, TranscriptionFrame, TTSSpeakFrame
from pipecat.frames.frames import Frame, MetricsFrame
from pipecat.metrics.metrics import (
LLMUsageMetricsData,
ProcessingMetricsData,
@@ -32,30 +32,7 @@ from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
load_dotenv(override=True)
# Custom processor that prints a message if it receives a TranscriptionFrame that says "banana"
class BananaProcessor(FrameProcessor):
"""A custom processor that listens for transcription frames containing the word 'banana'."""
def __init__(self):
super().__init__()
async def process_frame(self, frame: Frame, direction: FrameDirection):
# Ensure the super method is called first
await super().process_frame(frame, direction)
if isinstance(frame, TranscriptionFrame):
logger.debug(f"Received transcription frame: {frame.text}")
if "banana" in frame.text.lower():
logger.info("---- Received 'banana' in transcription frame")
# Push the frame after processing
await self.push_frame(frame)
class MetricsLogger(FrameProcessor):
def __init__(self):
super().__init__()
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
@@ -96,7 +73,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
ml = MetricsLogger()
@@ -110,13 +87,10 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
banana = BananaProcessor()
pipeline = Pipeline(
[
transport.input(),
stt,
banana,
context_aggregator.user(),
llm,
tts,

View File

@@ -91,7 +91,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{

View File

@@ -45,7 +45,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{

View File

@@ -44,7 +44,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{

View File

@@ -74,7 +74,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
("human", "{input}"),
]
)
chain = prompt | ChatOpenAI(model="gpt-4.1", temperature=0.7)
chain = prompt | ChatOpenAI(model="gpt-4o", temperature=0.7)
history_chain = RunnableWithMessageHistory(
chain,
get_session_history,

View File

@@ -48,7 +48,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
tts = DeepgramTTSService(api_key=os.getenv("DEEPGRAM_API_KEY"), voice="aura-helios-en")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{

View File

@@ -42,7 +42,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
tts = DeepgramTTSService(api_key=os.getenv("DEEPGRAM_API_KEY"), voice="aura-helios-en")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{

View File

@@ -49,7 +49,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
aiohttp_session=session,
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{

View File

@@ -45,7 +45,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{

View File

@@ -46,7 +46,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
voice_url="s3://voice-cloning-zero-shot/d9ff78ba-d016-47f6-b0ef-dd630f59414e/female-cs/manifest.json",
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{

View File

@@ -48,7 +48,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
params=PlayHTTTSService.InputParams(language=Language.EN),
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{

View File

@@ -46,7 +46,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
tts = OpenAITTSService(api_key=os.getenv("OPENAI_API_KEY"), voice="ballad")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{

View File

@@ -50,6 +50,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
llm = OpenPipeLLMService(
api_key=os.getenv("OPENAI_API_KEY"),
openpipe_api_key=os.getenv("OPENPIPE_API_KEY"),
model="gpt-4o",
tags={"conversation_id": f"pipecat-{timestamp}"},
)

View File

@@ -49,7 +49,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
base_url="http://localhost:8000",
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{

View File

@@ -54,7 +54,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY", ""))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY", ""), model="gpt-4o")
messages = [
{

View File

@@ -42,7 +42,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
tts = LmntTTSService(api_key=os.getenv("LMNT_API_KEY"), voice_id="morgan")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{

View File

@@ -48,7 +48,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
params=PollyTTSService.InputParams(engine="neural", language="en-GB", rate="1.05"),
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{

View File

@@ -47,7 +47,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{

View File

@@ -44,7 +44,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
tts = DeepgramTTSService(api_key=os.getenv("DEEPGRAM_API_KEY"), voice="aura-helios-en")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{

View File

@@ -49,7 +49,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
aiohttp_session=session,
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{

View File

@@ -45,7 +45,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
voice_id="rex",
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{

View File

@@ -45,7 +45,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
model="4ce7e917cedd4bc2bb2e6ff3a46acaa1", # Barack Obama
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{

View File

@@ -45,7 +45,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
voice_id="fc854436-2dac-4d21-aa69-ae17b54e98eb", # Emily
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{

View File

@@ -45,7 +45,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
voice_id="fc854436-2dac-4d21-aa69-ae17b54e98eb", # Emily
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{

View File

@@ -47,7 +47,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{

View File

@@ -45,7 +45,7 @@ async def main():
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{

View File

@@ -47,7 +47,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{

View File

@@ -93,7 +93,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),

View File

@@ -74,7 +74,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
# OpenAI GPT-4o for vision analysis
openai = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
openai = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),

View File

@@ -53,7 +53,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
# You can also register a function_name of None to get all functions
# sent to the same callback with an additional function_name parameter.

View File

@@ -82,7 +82,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
llm.register_function("get_weather", get_weather)
llm.register_function("get_image", get_image)

View File

@@ -83,7 +83,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
voice_id="a0e99841-438c-4a64-b679-ae501e7d6091", # Barbershop Man
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
llm.register_function("switch_voice", switch_voice)
tools = [

View File

@@ -73,7 +73,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
voice_id="d4db5fb9-f44b-4bd1-85fa-192e0f0d75f9", # Spanish-speaking Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
llm.register_function("switch_language", switch_language)
tools = [

View File

@@ -6,6 +6,7 @@
import os
import aiohttp
from dotenv import load_dotenv
from loguru import logger
@@ -39,101 +40,105 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
),
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
# Create an HTTP session
async with aiohttp.ClientSession() as session:
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = DeepgramTTSService(
api_key=os.getenv("DEEPGRAM_API_KEY"),
voice="aura-asteria-en",
base_url="http://0.0.0.0:8080",
)
tts = DeepgramTTSService(
aiohttp_session=session,
api_key=os.getenv("DEEPGRAM_API_KEY"),
voice="aura-asteria-en",
base_url="http://0.0.0.0:8080/v1/speak",
)
llm = OpenAILLMService(
# To use OpenAI
# api_key=os.getenv("OPENAI_API_KEY"),
# Or, to use a local vLLM (or similar) api server
model="meta-llama/Meta-Llama-3-8B-Instruct",
base_url="http://0.0.0.0:8000/v1",
)
llm = OpenAILLMService(
# To use OpenAI
# api_key=os.getenv("OPENAI_API_KEY"),
# model="gpt-4o"
# Or, to use a local vLLM (or similar) api server
model="meta-llama/Meta-Llama-3-8B-Instruct",
base_url="http://0.0.0.0:8000/v1",
)
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(),
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(),
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
),
)
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
# When the first participant joins, the bot should introduce itself.
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([context_aggregator.user().get_context_frame()])
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(),
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(),
]
)
# Handle "latency-ping" messages. The client will send app messages that look like
# this:
# { "latency-ping": { ts: <client-side timestamp> }}
#
# We want to send an immediate pong back to the client from this handler function.
# Also, we will push a frame into the top of the pipeline and send it after the
#
@transport.event_handler("on_app_message")
async def on_app_message(transport, message, sender):
try:
if "latency-ping" in message:
logger.debug(f"Received latency ping app message: {message}")
ts = message["latency-ping"]["ts"]
# Send immediately
transport.output().send_message(
DailyTransportMessageFrame(
message={"latency-pong-msg-handler": {"ts": ts}}, participant_id=sender
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
),
)
# When the first participant joins, the bot should introduce itself.
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([context_aggregator.user().get_context_frame()])
# Handle "latency-ping" messages. The client will send app messages that look like
# this:
# { "latency-ping": { ts: <client-side timestamp> }}
#
# We want to send an immediate pong back to the client from this handler function.
# Also, we will push a frame into the top of the pipeline and send it after the
#
@transport.event_handler("on_app_message")
async def on_app_message(transport, message, sender):
try:
if "latency-ping" in message:
logger.debug(f"Received latency ping app message: {message}")
ts = message["latency-ping"]["ts"]
# Send immediately
transport.output().send_message(
DailyTransportMessageFrame(
message={"latency-pong-msg-handler": {"ts": ts}}, participant_id=sender
)
)
)
# And push to the pipeline for the Daily transport.output to send
await task.queue_frame(
DailyTransportMessageFrame(
message={"latency-pong-pipeline-delivery": {"ts": ts}},
participant_id=sender,
# And push to the pipeline for the Daily transport.output to send
await task.queue_frame(
DailyTransportMessageFrame(
message={"latency-pong-pipeline-delivery": {"ts": ts}},
participant_id=sender,
)
)
)
except Exception as e:
logger.debug(f"message handling error: {e} - {message}")
except Exception as e:
logger.debug(f"message handling error: {e} - {message}")
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
@transport.event_handler("on_client_closed")
async def on_client_closed(transport, client):
logger.info(f"Client closed connection")
await task.cancel()
@transport.event_handler("on_client_closed")
async def on_client_closed(transport, client):
logger.info(f"Client closed connection")
await task.cancel()
runner = PipelineRunner(handle_sigint=False)
runner = PipelineRunner(handle_sigint=False)
await runner.run(task)
await runner.run(task)
if __name__ == "__main__":

View File

@@ -47,7 +47,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{

View File

@@ -185,7 +185,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
# you can either register a single function for all function calls, or specific functions
# llm.register_function(None, fetch_weather_from_api)

View File

@@ -56,7 +56,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
# statement. This doesn't really need to be an LLM, we could use NLP
# libraries for that, but it was easier as an example because we
# leverage the context aggregators.
statement_llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
statement_llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
statement_messages = [
{
@@ -69,7 +69,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
statement_context_aggregator = statement_llm.create_context_aggregator(statement_context)
# This is the regular LLM.
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{

View File

@@ -224,10 +224,10 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
# This is the LLM that will be used to detect if the user has finished a
# statement. This doesn't really need to be an LLM, we could use NLP
# libraries for that, but we have the machinery to use an LLM, so we might as well!
statement_llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
statement_llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
# This is the regular LLM.
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
# You can also register a function_name of None to get all functions
# sent to the same callback with an additional function_name parameter.
llm.register_function("get_current_weather", fetch_weather_from_api)

View File

@@ -428,10 +428,16 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
# This is the LLM that will be used to detect if the user has finished a
# statement. This doesn't really need to be an LLM, we could use NLP
# libraries for that, but we have the machinery to use an LLM, so we might as well!
statement_llm = AnthropicLLMService(api_key=os.getenv("ANTHROPIC_API_KEY"))
statement_llm = AnthropicLLMService(
api_key=os.getenv("ANTHROPIC_API_KEY"),
model="claude-3-5-sonnet-20241022",
)
# This is the regular LLM.
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o",
)
# Register a function_name of None to get all functions
# sent to the same callback with an additional function_name parameter.
llm.register_function("get_current_weather", fetch_weather_from_api)

View File

@@ -33,10 +33,7 @@ from pipecat.pipeline.parallel_pipeline import ParallelPipeline
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantAggregatorParams,
LLMAssistantResponseAggregator,
)
from pipecat.processors.aggregators.llm_response import LLMAssistantResponseAggregator
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
OpenAILLMContextFrame,
@@ -481,7 +478,7 @@ class LLMAggregatorBuffer(LLMAssistantResponseAggregator):
"""Buffers the output of the transcription LLM. Used by the bot output gate."""
def __init__(self, **kwargs):
super().__init__(params=LLMAssistantAggregatorParams(expect_stripped_words=False))
super().__init__(expect_stripped_words=False)
self._transcription = ""
async def process_frame(self, frame: Frame, direction: FrameDirection):

View File

@@ -62,7 +62,7 @@ async def main():
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{

View File

@@ -4,13 +4,15 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Usage
"""
Usage
-----
Set the path to your background audio file using the `INPUT_AUDIO_PATH` environment variable, then run the bot using:
INPUT_AUDIO_PATH=path/to/your_audio.mp3 python 23-bot-background-sound.py
Example:
INPUT_AUDIO_PATH=my_audio.mp3 python 23-bot-background-sound.py
"""
@@ -69,7 +71,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{

View File

@@ -64,7 +64,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
tts = DeepgramTTSService(api_key=os.getenv("DEEPGRAM_API_KEY"), voice="aura-helios-en")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
llm.register_function("get_current_weather", fetch_weather_from_api)
weather_function = FunctionSchema(

View File

@@ -109,7 +109,10 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o",
)
messages = [
{

View File

@@ -127,7 +127,7 @@ async def main():
),
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),

View File

@@ -88,7 +88,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{

View File

@@ -120,7 +120,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
)
# Initialize LLM
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
# System prompt for storytelling with voice switching
system_prompt = """You are an engaging storyteller that uses different voices to bring stories to life.

View File

@@ -63,7 +63,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
# aiohttp_session=session,
# )
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
# You can aslo register a function_name of None to get all functions
# sent to the same callback with an additional function_name parameter.
llm.register_function("store_user_emails", store_user_emails)

View File

@@ -210,6 +210,10 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
@rtvi.event_handler("on_client_ready")
async def on_client_ready(rtvi):
await rtvi.set_bot_ready()
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Get personalized greeting based on user memories. Can pass agent_id and run_id as per requirement of the application to manage short term memory or agent specific memory.
greeting = await get_initial_greeting(
memory_client=memory.memory_client, user_id=USER_ID, agent_id=None, run_id=None
@@ -221,10 +225,6 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
# Queue the context frame to start the conversation
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")

View File

@@ -1,111 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.turn.smart_turn import SmartTurnAnalyzer
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
load_dotenv(override=True)
async def run_bot(webrtc_connection: SmallWebRTCConnection):
logger.info(f"Starting bot")
remote_smart_turn_url = os.getenv("REMOTE_SMART_TURN_URL")
transport = SmallWebRTCTransport(
webrtc_connection=webrtc_connection,
params=TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
vad_audio_passthrough=True,
turn_analyzer=SmartTurnAnalyzer(url=remote_smart_turn_url),
),
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
@transport.event_handler("on_client_closed")
async def on_client_closed(transport, client):
logger.info(f"Client closed connection")
await task.cancel()
runner = PipelineRunner(handle_sigint=False)
await runner.run(task)
if __name__ == "__main__":
from run import main
main()

View File

@@ -1,129 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.local_smart_turn import LocalCoreMLSmartTurnAnalyzer
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
load_dotenv(override=True)
async def run_bot(webrtc_connection: SmallWebRTCConnection):
logger.info(f"Starting bot")
# To use this locally, set the environment variable LOCAL_SMART_TURN_MODEL_PATH
# to the path where the smart-turn repo is cloned.
#
# Example setup:
#
# # Git LFS (Large File Storage)
# brew install git-lfs
# # Hugging Face uses LFS to store large model files, including .mlpackage
# git lfs install
# # Clone the repo with the smart_turn_classifier.mlpackage
# git clone https://huggingface.co/pipecat-ai/smart-turn
#
# Then set the env variable:
# export LOCAL_SMART_TURN_MODEL_PATH=./smart-turn
# or add it to your .env file
smart_turn_model_path = os.getenv("LOCAL_SMART_TURN_MODEL_PATH")
transport = SmallWebRTCTransport(
webrtc_connection=webrtc_connection,
params=TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
vad_audio_passthrough=True,
turn_analyzer=LocalCoreMLSmartTurnAnalyzer(
smart_turn_model_path=smart_turn_model_path, params=SmartTurnParams()
),
),
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
@transport.event_handler("on_client_closed")
async def on_client_closed(transport, client):
logger.info(f"Client closed connection")
await task.cancel()
runner = PipelineRunner(handle_sigint=False)
await runner.run(task)
if __name__ == "__main__":
from run import main
main()

View File

@@ -98,16 +98,14 @@ async def main():
@rtvi.event_handler("on_client_ready")
async def on_client_ready(rtvi):
await rtvi.set_bot_ready()
# Kick off the conversation
await task.queue_frames([context_aggregator.user().get_context_frame()])
@daily_transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
logger.debug("First participant joined: {}", participant["id"])
await task.queue_frames([context_aggregator.user().get_context_frame()])
@daily_transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
logger.debug(f"Participant left: {participant}")
print(f"Participant left: {participant}")
await task.cancel()
runner = PipelineRunner(handle_sigint=False)

View File

@@ -156,7 +156,7 @@ async def main():
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
ta = TalkingAnimation()

View File

@@ -148,13 +148,10 @@ async def main():
@rtvi.event_handler("on_client_ready")
async def on_client_ready(rtvi):
await rtvi.set_bot_ready()
# Kick off the conversation
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
logger.debug("First participant joined: {}", participant["id"])
await transport.capture_participant_transcription(participant["id"])
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):

View File

@@ -1,61 +0,0 @@
# SmallWebRTC and Daily
A Pipecat example demonstrating how to interoperate audio and video between `SmallWebRTCTransport` and `DailyTransport`.
## 🚀 Quick Start
### 1⃣ Start the Bot Server
#### 🔧 Set Up the Environment
1. Create and activate a virtual environment:
```bash
python3 -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
```
2. Install dependencies:
```bash
pip install -r requirements.txt
```
3. Configure environment variables:
- Copy `env.example` to `.env`
```bash
cp env.example .env
```
- Add your API keys
#### ▶️ Run the Server
```bash
python server.py
```
### 1⃣ Connect the first client using Daily Prebuilt
- Open your browser and navigate to the same URL that you configured inside your `.env` file:
- `DAILY_SAMPLE_ROOM_URL`
### 2⃣ Connect the second client using SmallWebRTC Prebuilt UI
- Open your browser and navigate to:
👉 http://localhost:7860
- (Or use your custom port, if configured)
## ⚠️ Important Note
Ensure the bot server is running before using any client implementations.
## 📌 Requirements
- Python **3.10+**
- Node.js **16+** (for JavaScript components)
- Google API Key
- Modern web browser with WebRTC support
---
### 💡 Notes
- Ensure all dependencies are installed before running the server.
- Check the `.env` file for missing configurations.
- WebRTC requires a secure environment (HTTPS) for full functionality in production.
Happy coding! 🎉

View File

@@ -1,128 +0,0 @@
#
# Copyright (c) 2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
import sys
from dotenv import load_dotenv
from loguru import logger
from pipecat.frames.frames import (
InputAudioRawFrame,
InputImageRawFrame,
OutputAudioRawFrame,
OutputImageRawFrame,
)
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.frame_processor import Frame, FrameDirection, FrameProcessor
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
class MirrorProcessor(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, InputAudioRawFrame):
await self.push_frame(
OutputAudioRawFrame(
audio=frame.audio,
sample_rate=frame.sample_rate,
num_channels=frame.num_channels,
)
)
elif isinstance(frame, InputImageRawFrame):
await self.push_frame(
OutputImageRawFrame(image=frame.image, size=frame.size, format=frame.format)
)
else:
await self.push_frame(frame, direction)
async def run_bot(webrtc_connection):
pipecat_transport = SmallWebRTCTransport(
webrtc_connection=webrtc_connection,
params=TransportParams(
camera_in_enabled=True,
camera_out_enabled=True,
camera_out_is_live=True,
audio_in_enabled=True,
audio_out_enabled=True,
camera_out_width=1280,
camera_out_height=720,
vad_enabled=False,
),
)
room_url = os.getenv("DAILY_SAMPLE_ROOM_URL", "")
daily_transport = DailyTransport(
room_url,
None,
"SmallWebRTC",
params=DailyParams(
camera_in_enabled=True,
camera_out_enabled=True,
camera_out_is_live=True,
audio_in_enabled=True,
audio_out_enabled=True,
camera_out_width=1280,
camera_out_height=720,
vad_enabled=False,
),
)
pipeline = Pipeline(
[
ParallelPipeline(
[
daily_transport.input(),
MirrorProcessor(),
pipecat_transport.output(),
],
[
pipecat_transport.input(),
MirrorProcessor(),
daily_transport.output(),
],
)
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=False,
),
)
@daily_transport.event_handler("on_participant_joined")
async def on_participant_joined(transport, participant):
await transport.capture_participant_video(participant["id"])
@pipecat_transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info("Pipecat Client connected")
@pipecat_transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info("Pipecat Client disconnected")
@pipecat_transport.event_handler("on_client_closed")
async def on_client_closed(transport, client):
logger.info("Pipecat Client closed")
await task.cancel()
runner = PipelineRunner(handle_sigint=False)
await runner.run(task)

View File

@@ -1,2 +0,0 @@
DAILY_API_KEY=
DAILY_SAMPLE_ROOM_URL=

View File

@@ -1,5 +0,0 @@
python-dotenv
fastapi[all]
uvicorn
aiortc
pipecat-ai[silero, webrtc, daily]

View File

@@ -1,89 +0,0 @@
import argparse
import asyncio
import logging
from contextlib import asynccontextmanager
from typing import Dict
import uvicorn
from bot import run_bot
from dotenv import load_dotenv
from fastapi import BackgroundTasks, FastAPI
from fastapi.responses import RedirectResponse
from pipecat_ai_small_webrtc_prebuilt.frontend import SmallWebRTCPrebuiltUI
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
# Load environment variables
load_dotenv(override=True)
logger = logging.getLogger("pc")
app = FastAPI()
# Store connections by pc_id
pcs_map: Dict[str, SmallWebRTCConnection] = {}
ice_servers = ["stun:stun.l.google.com:19302"]
# Mount the frontend at /
app.mount("/prebuilt", SmallWebRTCPrebuiltUI)
@app.get("/", include_in_schema=False)
async def root_redirect():
return RedirectResponse(url="/prebuilt/")
@app.post("/api/offer")
async def offer(request: dict, background_tasks: BackgroundTasks):
pc_id = request.get("pc_id")
if pc_id and pc_id in pcs_map:
pipecat_connection = pcs_map[pc_id]
logger.info(f"Reusing existing connection for pc_id: {pc_id}")
await pipecat_connection.renegotiate(
sdp=request["sdp"], type=request["type"], restart_pc=request.get("restart_pc", False)
)
else:
pipecat_connection = SmallWebRTCConnection(ice_servers)
await pipecat_connection.initialize(sdp=request["sdp"], type=request["type"])
@pipecat_connection.event_handler("closed")
async def handle_disconnected(webrtc_connection: SmallWebRTCConnection):
logger.info(f"Discarding peer connection for pc_id: {webrtc_connection.pc_id}")
pcs_map.pop(webrtc_connection.pc_id, None)
background_tasks.add_task(run_bot, pipecat_connection)
answer = pipecat_connection.get_answer()
# Updating the peer connection inside the map
pcs_map[answer["pc_id"]] = pipecat_connection
return answer
@asynccontextmanager
async def lifespan(app: FastAPI):
yield # Run app
coros = [pc.close() for pc in pcs_map.values()]
await asyncio.gather(*coros)
pcs_map.clear()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="WebRTC demo")
parser.add_argument(
"--host", default="localhost", help="Host for HTTP server (default: localhost)"
)
parser.add_argument(
"--port", type=int, default=7860, help="Port for HTTP server (default: 7860)"
)
parser.add_argument("--verbose", "-v", action="count")
args = parser.parse_args()
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
uvicorn.run(app, host=args.host, port=args.port)

View File

@@ -135,12 +135,12 @@ async def run_bot(webrtc_connection):
async def on_client_ready(rtvi):
logger.info("Pipecat client ready.")
await rtvi.set_bot_ready()
# Kick off the conversation.
await task.queue_frames([context_aggregator.user().get_context_frame()])
@pipecat_transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info("Pipecat Client connected")
# Kick off the conversation.
await task.queue_frames([context_aggregator.user().get_context_frame()])
@pipecat_transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):

View File

@@ -40,9 +40,7 @@
const createSmallWebRTCConnection = async (audioTrack) => {
const pc = new RTCPeerConnection()
pc.ontrack = e => audioEl.srcObject = e.streams[0]
// SmallWebRTCTransport expects to receive both transceivers
pc.addTransceiver(audioTrack, { direction: 'sendrecv' })
pc.addTransceiver('video', { direction: 'sendrecv' })
await pc.setLocalDescription(await pc.createOffer())
//await waitForIceGatheringComplete(pc)
const offer = pc.localDescription

View File

@@ -324,7 +324,7 @@ async def main():
# voice_id="846d6cb0-2301-48b6-9683-48f5618ea2f6", # Spanish-speaking Lady
# )
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = []
context = OpenAILLMContext(messages=messages)

View File

@@ -60,7 +60,7 @@ async def main(room_url: str, token: str, callId: str, sipUri: str):
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{

View File

@@ -305,7 +305,7 @@ async def main(
tools = ToolsSchema(standard_tools=[terminate_call_function, dial_operator_function])
# Initialize LLM
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
# Register functions with the LLM
llm.register_function(

View File

@@ -129,7 +129,7 @@ async def main(
system_instruction = """You are Chatbot, a friendly, helpful robot. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by introducing yourself. If the user ends the conversation, **IMMEDIATELY** call the `terminate_call` function. """
# Initialize LLM
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
# Register functions with the LLM
llm.register_function("terminate_call", terminate_call)

View File

@@ -101,7 +101,7 @@ async def main(
system_instruction = """You are Chatbot, a friendly, helpful robot. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by introducing yourself. If the user ends the conversation, **IMMEDIATELY** call the `terminate_call` function. """
# Initialize LLM
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
# Register functions with the LLM
llm.register_function("terminate_call", terminate_call)

View File

@@ -63,6 +63,7 @@ async def main():
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o",
metrics=SentryMetrics(),
)

View File

@@ -1,13 +1,13 @@
import { useRef, useCallback } from 'react';
import { useRef, useCallback } from "react";
import {
Participant,
RTVIEvent,
TransportState,
TranscriptData,
BotLLMTextData,
} from '@pipecat-ai/client-js';
import { useRTVIClient, useRTVIClientEvent } from '@pipecat-ai/client-react';
import './DebugDisplay.css';
} from "@pipecat-ai/client-js";
import { useRTVIClient, useRTVIClientEvent } from "@pipecat-ai/client-react";
import "./DebugDisplay.css";
export function DebugDisplay() {
const debugLogRef = useRef<HTMLDivElement>(null);
@@ -16,14 +16,14 @@ export function DebugDisplay() {
const log = useCallback((message: string) => {
if (!debugLogRef.current) return;
const entry = document.createElement('div');
const entry = document.createElement("div");
entry.textContent = `${new Date().toISOString()} - ${message}`;
// Add styling based on message type
if (message.startsWith('User: ')) {
entry.style.color = '#2196F3'; // blue for user
} else if (message.startsWith('Bot: ')) {
entry.style.color = '#4CAF50'; // green for bot
if (message.startsWith("User: ")) {
entry.style.color = "#2196F3"; // blue for user
} else if (message.startsWith("Bot: ")) {
entry.style.color = "#4CAF50"; // green for bot
}
debugLogRef.current.appendChild(entry);
@@ -68,7 +68,7 @@ export function DebugDisplay() {
useCallback(
(track: MediaStreamTrack, participant?: Participant) => {
log(
`Track started: ${track.kind} from ${participant?.name || 'unknown'}`
`Track started: ${track.kind} from ${participant?.name || "unknown"}`
);
},
[log]
@@ -76,11 +76,11 @@ export function DebugDisplay() {
);
useRTVIClientEvent(
RTVIEvent.TrackStopped,
RTVIEvent.TrackedStopped,
useCallback(
(track: MediaStreamTrack, participant?: Participant) => {
log(
`Track stopped: ${track.kind} from ${participant?.name || 'unknown'}`
`Track stopped: ${track.kind} from ${participant?.name || "unknown"}`
);
},
[log]
@@ -135,6 +135,16 @@ export function DebugDisplay() {
)
);
useRTVIClientEvent(
RTVIEvent.ServerMessage,
useCallback(
(data: unknown) => {
log(`Server Message: ${data}`);
},
[log]
)
);
return (
<div className="debug-panel">
<h3>Debug Info</h3>

View File

@@ -70,17 +70,3 @@ Run the server:
```bash
python server.py
```
## Troubleshooting
If you encounred this error:
```bash
aiohttp.client_exceptions.ClientConnectorCertificateError: Cannot connect to host api.daily.co:443 ssl:True [SSLCertVerificationError: (1, '[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1000)')]
```
It's because Python cannot verify the SSL certificate from https://api.daily.co when making a POST request to create a room or token.
This is a common issue when the system doesn't have the proper CA certificates.
Install SSL Certificates (macOS): `/Applications/Python\ 3.12/Install\ Certificates.command`

View File

@@ -183,12 +183,11 @@ async def main():
@rtvi.event_handler("on_client_ready")
async def on_client_ready(rtvi):
await rtvi.set_bot_ready()
# Kick off the conversation
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):

View File

@@ -40,7 +40,12 @@ from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
from pipecat.processors.frameworks.rtvi import (
RTVIConfig,
RTVIObserver,
RTVIProcessor,
RTVIServerMessageFrame,
)
from pipecat.services.elevenlabs.tts import ElevenLabsTTSService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
@@ -90,15 +95,22 @@ class TalkingAnimation(FrameProcessor):
"""
await super().process_frame(frame, direction)
# Send a custom message to client
animation_frame = RTVIServerMessageFrame(
data={"type": "animation", "payload": {"is_talking": self._is_talking}}
)
# Switch to talking animation when bot starts speaking
if isinstance(frame, BotStartedSpeakingFrame):
if not self._is_talking:
await self.push_frame(talking_frame)
self._is_talking = True
await self.push_frame(animation_frame)
# Return to static frame when bot stops speaking
elif isinstance(frame, BotStoppedSpeakingFrame):
await self.push_frame(quiet_frame)
self._is_talking = False
await self.push_frame(animation_frame)
await self.push_frame(frame, direction)
@@ -155,7 +167,7 @@ async def main():
)
# Initialize LLM service
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
@@ -210,12 +222,11 @@ async def main():
@rtvi.event_handler("on_client_ready")
async def on_client_ready(rtvi):
await rtvi.set_bot_ready()
# Kick off the conversation
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):

View File

@@ -48,7 +48,7 @@ async def run_bot(
),
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))

View File

@@ -150,7 +150,7 @@ async def main():
in_language = "English"
out_language = "Spanish"
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
context = OpenAILLMContext()
context_aggregator = llm.create_context_aggregator(context)

View File

@@ -68,7 +68,7 @@ async def run_bot(websocket_client: WebSocket, stream_sid: str, testing: bool):
),
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"), audio_passthrough=True)

View File

@@ -98,7 +98,7 @@ async def run_client(client_name: str, server_url: str, duration_secs: int):
),
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
# We let the audio passthrough so we can record the conversation.
stt = DeepgramSTTService(

View File

@@ -91,7 +91,7 @@ async def main():
)
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))

View File

@@ -54,7 +54,7 @@ fal = [ "fal-client~=0.5.9" ]
fireworks = []
fish = [ "ormsgpack~=1.7.0", "websockets~=13.1" ]
gladia = [ "websockets~=13.1" ]
google = [ "google-cloud-speech~=2.31.1", "google-cloud-texttospeech~=2.25.1", "google-genai~=1.7.0", "google-generativeai~=0.8.4", "websockets~=13.1" ]
google = [ "google-cloud-speech~=2.31.1", "google-cloud-texttospeech~=2.25.1", "google-genai~=1.7.0", "google-generativeai~=0.8.4" ]
grok = []
groq = [ "groq~=0.20.0" ]
gstreamer = [ "pygobject~=3.50.0" ]
@@ -79,8 +79,6 @@ qwen = []
rime = [ "websockets~=13.1" ]
riva = [ "nvidia-riva-client~=2.19.0" ]
sentry = [ "sentry-sdk~=2.23.1" ]
local-smart-turn = [ "coremltools>=8.0", "transformers", "torch==2.5.0", "torchaudio==2.5.0" ]
remote-smart-turn = []
silero = [ "onnxruntime~=1.20.1" ]
simli = [ "simli-ai~=0.1.10"]
soundfile = [ "soundfile~=0.13.0" ]

View File

@@ -38,11 +38,9 @@ class SoundfileMixer(BaseAudioMixer):
def __init__(
self,
*,
sound_files: Mapping[str, str],
default_sound: str,
volume: float = 0.4,
mixing: bool = True,
loop: bool = True,
**kwargs,
):
@@ -54,7 +52,7 @@ class SoundfileMixer(BaseAudioMixer):
self._sound_pos = 0
self._sounds: Dict[str, Any] = {}
self._current_sound = default_sound
self._mixing = mixing
self._mixing = True
self._loop = loop
async def start(self, sample_rate: int):

View File

@@ -1,168 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import time
from abc import abstractmethod
from typing import Dict, Optional
import numpy as np
from loguru import logger
from pydantic import BaseModel
from pipecat.audio.turn.base_turn_analyzer import BaseTurnAnalyzer, EndOfTurnState
# Default timing parameters
STOP_SECS = 3
PRE_SPEECH_MS = 0
MAX_DURATION_SECONDS = 8 # Max allowed segment duration
USE_ONLY_LAST_VAD_SEGMENT = True
class SmartTurnParams(BaseModel):
stop_secs: float = STOP_SECS
pre_speech_ms: float = PRE_SPEECH_MS
max_duration_secs: float = MAX_DURATION_SECONDS
# not exposing this for now yet until the model can handle it.
# use_only_last_vad_segment: bool = USE_ONLY_LAST_VAD_SEGMENT
class BaseSmartTurn(BaseTurnAnalyzer):
def __init__(
self, *, sample_rate: Optional[int] = None, params: SmartTurnParams = SmartTurnParams()
):
super().__init__(sample_rate=sample_rate)
self._params = params
# Configuration
self._stop_ms = self._params.stop_secs * 1000 # silence threshold in ms
# Inference state
self._audio_buffer = []
self._speech_triggered = False
self._silence_ms = 0
self._speech_start_time = None
@property
def speech_triggered(self) -> bool:
return self._speech_triggered
def append_audio(self, buffer: bytes, is_speech: bool) -> EndOfTurnState:
# Convert raw audio to float32 format and append to the buffer
audio_int16 = np.frombuffer(buffer, dtype=np.int16)
audio_float32 = np.frombuffer(audio_int16, dtype=np.int16).astype(np.float32) / 32768.0
self._audio_buffer.append((time.time(), audio_float32))
state = EndOfTurnState.INCOMPLETE
if is_speech:
# Reset silence tracking on speech
self._silence_ms = 0
self._speech_triggered = True
if self._speech_start_time is None:
self._speech_start_time = time.time()
logger.debug(f"Speech started at {self._speech_start_time}")
else:
if self._speech_triggered:
chunk_duration_ms = len(audio_int16) / (self._sample_rate / 1000)
self._silence_ms += chunk_duration_ms
# If silence exceeds threshold, mark end of turn
if self._silence_ms >= self._stop_ms:
logger.debug(
f"End of Turn complete due to stop_secs. Silence in ms: {self._silence_ms}"
)
state = EndOfTurnState.COMPLETE
self._clear(state)
else:
# Trim buffer to prevent unbounded growth before speech
max_buffer_time = (
(self._params.pre_speech_ms / 1000)
+ self._params.stop_secs
+ self._params.max_duration_secs
)
while (
self._audio_buffer and self._audio_buffer[0][0] < time.time() - max_buffer_time
):
self._audio_buffer.pop(0)
return state
def analyze_end_of_turn(self) -> EndOfTurnState:
logger.debug("Analyzing End of Turn...")
state = self._process_speech_segment(self._audio_buffer)
if state == EndOfTurnState.COMPLETE or USE_ONLY_LAST_VAD_SEGMENT:
self._clear(state)
logger.debug(f"End of Turn result: {state}")
return state
def _clear(self, turn_state: EndOfTurnState):
# Reset internal state for next turn
logger.debug("Clearing audio buffer...")
# If the state is still incomplete, keep the _speech_triggered as True
self._speech_triggered = turn_state == EndOfTurnState.INCOMPLETE
self._audio_buffer = []
self._speech_start_time = None
self._silence_ms = 0
def _process_speech_segment(self, audio_buffer) -> EndOfTurnState:
state = EndOfTurnState.INCOMPLETE
if not audio_buffer:
return state
# Extract recent audio segment for prediction
start_time = self._speech_start_time - (self._params.pre_speech_ms / 1000)
start_index = 0
for i, (t, _) in enumerate(audio_buffer):
if t >= start_time:
start_index = i
break
end_index = len(audio_buffer) - 1
# Extract the audio segment
segment_audio_chunks = [chunk for _, chunk in audio_buffer[start_index : end_index + 1]]
segment_audio = np.concatenate(segment_audio_chunks)
logger.debug(f"Segment audio chunks after start index: {len(segment_audio)}")
# Limit maximum duration
max_samples = int(self._params.max_duration_secs * self.sample_rate)
if len(segment_audio) > max_samples:
# slices the array to keep the last max_samples samples, discarding the earlier part.
segment_audio = segment_audio[-max_samples:]
logger.debug(f"Segment audio chunks after limiting duration: {len(segment_audio)}")
if len(segment_audio) > 0:
start_time = time.perf_counter()
result = self._predict_endpoint(segment_audio)
state = (
EndOfTurnState.COMPLETE if result["prediction"] == 1 else EndOfTurnState.INCOMPLETE
)
end_time = time.perf_counter()
logger.debug("--------")
logger.debug(f"Prediction: {'Complete' if result['prediction'] == 1 else 'Incomplete'}")
logger.debug(f"Probability of complete: {result['probability']:.4f}")
logger.debug(f"Prediction took {(end_time - start_time) * 1000:.2f}ms seconds")
else:
logger.debug(f"params: {self._params}, stop_ms: {self._stop_ms}")
logger.debug("Captured empty audio segment, skipping prediction.")
return state
@abstractmethod
def _predict_endpoint(self, buffer: np.ndarray) -> Dict[str, any]:
"""
Abstract method to predict if a turn has ended based on audio.
Args:
buffer: Float32 numpy array of audio samples at 16kHz.
Returns:
Dictionary with:
- prediction: 1 if turn is complete, else 0
- probability: Confidence of the prediction
"""
pass

View File

@@ -1,81 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
from abc import ABC, abstractmethod
from enum import Enum
from typing import Optional
class EndOfTurnState(Enum):
COMPLETE = 1
INCOMPLETE = 2
class BaseTurnAnalyzer(ABC):
"""
Abstract base class for analyzing user end of turn.
"""
def __init__(self, *, sample_rate: Optional[int] = None):
self._init_sample_rate = sample_rate
self._sample_rate = 0
@property
def sample_rate(self) -> int:
"""
Returns the current sample rate.
Returns:
int: The effective sample rate for audio processing.
"""
return self._sample_rate
def set_sample_rate(self, sample_rate: int):
"""
Sets the sample rate for audio processing.
If the initial sample rate was provided, it will use that; otherwise, it sets to
the provided sample rate.
Args:
sample_rate (int): The sample rate to set.
"""
self._sample_rate = self._init_sample_rate or sample_rate
@property
@abstractmethod
def speech_triggered(self) -> bool:
"""
Determines if speech has been detected.
Returns:
bool: True if speech is triggered, otherwise False.
"""
pass
@abstractmethod
def append_audio(self, buffer: bytes, is_speech: bool) -> EndOfTurnState:
"""
Appends audio data for analysis.
Args:
buffer (bytes): The audio data to append.
is_speech (bool): Indicates whether the appended audio is speech or not.
Returns:
EndOfTurnState: The resulting state after appending the audio.
"""
pass
@abstractmethod
def analyze_end_of_turn(self) -> EndOfTurnState:
"""
Analyzes if an end of turn has occurred based on the audio input.
Returns:
EndOfTurnState: The result of the end of turn analysis.
"""
pass

View File

@@ -1,65 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from typing import Dict
import numpy as np
import torch
from loguru import logger
from pipecat.audio.turn.base_smart_turn import BaseSmartTurn
try:
import coremltools as ct
from transformers import AutoFeatureExtractor
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
"In order to use the LocalSmartTurnAnalyzer, you need to `pip install pipecat-ai[local-smart-turn]`."
)
raise Exception(f"Missing module: {e}")
class LocalCoreMLSmartTurnAnalyzer(BaseSmartTurn):
def __init__(self, smart_turn_model_path: str, **kwargs):
super().__init__(**kwargs)
if not smart_turn_model_path:
logger.error("smart_turn_model_path is not set.")
raise Exception("smart_turn_model_path must be provided.")
core_ml_model_path = f"{smart_turn_model_path}/coreml/smart_turn_classifier.mlpackage"
logger.debug("Loading Local Smart Turn model...")
# Only load the processor, not the torch model
self._turn_processor = AutoFeatureExtractor.from_pretrained(smart_turn_model_path)
self._turn_model = ct.models.MLModel(core_ml_model_path)
logger.debug("Loaded Local Smart Turn")
def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, any]:
inputs = self._turn_processor(
audio_array,
sampling_rate=16000,
padding="max_length",
truncation=True,
max_length=800, # Maximum length as specified in training
return_attention_mask=True,
return_tensors="pt",
)
output = self._turn_model.predict(dict(inputs))
logits = output["logits"] # Core ML returns numpy array
logits_tensor = torch.tensor(logits)
probabilities = torch.nn.functional.softmax(logits_tensor, dim=1)
completion_prob = probabilities[0, 1].item() # Probability of class 1 (Complete)
prediction = 1 if completion_prob > 0.5 else 0
return {
"prediction": prediction,
"probability": completion_prob,
}

View File

@@ -1,75 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import io
import os
from typing import Dict
import numpy as np
import requests
from loguru import logger
from pipecat.audio.turn.base_smart_turn import BaseSmartTurn
class SmartTurnAnalyzer(BaseSmartTurn):
def __init__(self, url: str, **kwargs):
super().__init__(**kwargs)
self.remote_smart_turn_url = url
if not self.remote_smart_turn_url:
logger.error("remote_smart_turn_url is not set.")
raise Exception("remote_smart_turn_url must be provided.")
# Use a session to reuse connections (keep-alive)
self.session = requests.Session()
self.session.headers.update({"Connection": "keep-alive"})
def _serialize_array(self, audio_array: np.ndarray) -> bytes:
logger.trace("Serializing NumPy array to bytes...")
buffer = io.BytesIO()
np.save(buffer, audio_array)
serialized_bytes = buffer.getvalue()
logger.trace(f"Serialized size: {len(serialized_bytes)} bytes")
return serialized_bytes
def _send_raw_request(self, data_bytes: bytes):
headers = {"Content-Type": "application/octet-stream"}
logger.trace(
f"Sending {len(data_bytes)} bytes as raw body to {self.remote_smart_turn_url}..."
)
try:
response = self.session.post(
self.remote_smart_turn_url,
data=data_bytes,
headers=headers,
timeout=60,
)
logger.trace("\n--- Response ---")
logger.trace(f"Status Code: {response.status_code}")
if response.ok:
try:
logger.trace("Response JSON:")
logger.trace(response.json())
return response.json()
except requests.exceptions.JSONDecodeError:
logger.trace("Response Content (non-JSON):")
logger.trace(response.text)
else:
logger.trace("Response Content (Error):")
logger.trace(response.text)
response.raise_for_status()
except requests.exceptions.RequestException as e:
logger.error(f"Failed to send raw request to Daily Smart Turn: {e}")
raise Exception("Failed to send raw request to Daily Smart Turn.")
def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, any]:
serialized_array = self._serialize_array(audio_array)
return self._send_raw_request(serialized_array)

View File

@@ -6,7 +6,6 @@
import asyncio
from abc import abstractmethod
from dataclasses import dataclass
from typing import Dict, List, Literal, Set
from loguru import logger
@@ -47,16 +46,6 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.utils.time import time_now_iso8601
@dataclass
class LLMUserAggregatorParams:
aggregation_timeout: float = 1.0
@dataclass
class LLMAssistantAggregatorParams:
expect_stripped_words: bool = True
class LLMFullResponseAggregator(FrameProcessor):
"""This is an LLM aggregator that aggregates a full LLM completion. It
aggregates LLM text frames (tokens) received between
@@ -241,23 +230,11 @@ class LLMUserContextAggregator(LLMContextResponseAggregator):
def __init__(
self,
context: OpenAILLMContext,
*,
params: LLMUserAggregatorParams = LLMUserAggregatorParams(),
aggregation_timeout: float = 1.0,
**kwargs,
):
super().__init__(context=context, role="user", **kwargs)
self._params = params
if "aggregation_timeout" in kwargs:
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"Parameter 'aggregation_timeout' is deprecated, use 'params' instead.",
DeprecationWarning,
)
self._params.aggregation_timeout = kwargs["aggregation_timeout"]
self._aggregation_timeout = aggregation_timeout
self._seen_interim_results = False
self._user_speaking = False
@@ -380,9 +357,7 @@ class LLMUserContextAggregator(LLMContextResponseAggregator):
async def _aggregation_task_handler(self):
while True:
try:
await asyncio.wait_for(
self._aggregation_event.wait(), self._params.aggregation_timeout
)
await asyncio.wait_for(self._aggregation_event.wait(), self._aggregation_timeout)
await self._maybe_push_bot_interruption()
except asyncio.TimeoutError:
if not self._user_speaking:
@@ -419,27 +394,9 @@ class LLMAssistantContextAggregator(LLMContextResponseAggregator):
"""
def __init__(
self,
context: OpenAILLMContext,
*,
params: LLMAssistantAggregatorParams = LLMAssistantAggregatorParams(),
**kwargs,
):
def __init__(self, context: OpenAILLMContext, *, expect_stripped_words: bool = True, **kwargs):
super().__init__(context=context, role="assistant", **kwargs)
self._params = params
if "expect_stripped_words" in kwargs:
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"Parameter 'expect_stripped_words' is deprecated, use 'params' instead.",
DeprecationWarning,
)
self._params.expect_stripped_words = kwargs["expect_stripped_words"]
self._expect_stripped_words = expect_stripped_words
self._started = 0
self._function_calls_in_progress: Dict[str, FunctionCallInProgressFrame] = {}
@@ -601,7 +558,7 @@ class LLMAssistantContextAggregator(LLMContextResponseAggregator):
if not self._started:
return
if self._params.expect_stripped_words:
if self._expect_stripped_words:
self._aggregation += f" {frame.text}" if self._aggregation else frame.text
else:
self._aggregation += frame.text
@@ -615,14 +572,8 @@ class LLMAssistantContextAggregator(LLMContextResponseAggregator):
class LLMUserResponseAggregator(LLMUserContextAggregator):
def __init__(
self,
messages: List[dict] = [],
*,
params: LLMUserAggregatorParams = LLMUserAggregatorParams(),
**kwargs,
):
super().__init__(context=OpenAILLMContext(messages), params=params, **kwargs)
def __init__(self, messages: List[dict] = [], **kwargs):
super().__init__(context=OpenAILLMContext(messages), **kwargs)
async def push_aggregation(self):
if len(self._aggregation) > 0:
@@ -637,14 +588,8 @@ class LLMUserResponseAggregator(LLMUserContextAggregator):
class LLMAssistantResponseAggregator(LLMAssistantContextAggregator):
def __init__(
self,
messages: List[dict] = [],
*,
params: LLMAssistantAggregatorParams = LLMAssistantAggregatorParams(),
**kwargs,
):
super().__init__(context=OpenAILLMContext(messages), params=params, **kwargs)
def __init__(self, messages: List[dict] = [], **kwargs):
super().__init__(context=OpenAILLMContext(messages), **kwargs)
async def push_aggregation(self):
if len(self._aggregation) > 0:

View File

@@ -1 +0,0 @@

View File

@@ -11,7 +11,7 @@ import io
import json
import re
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Union
from typing import Any, Dict, List, Mapping, Optional, Union
import httpx
from loguru import logger
@@ -35,9 +35,7 @@ from pipecat.frames.frames import (
)
from pipecat.metrics.metrics import LLMTokenUsage
from pipecat.processors.aggregators.llm_response import (
LLMAssistantAggregatorParams,
LLMAssistantContextAggregator,
LLMUserAggregatorParams,
LLMUserContextAggregator,
)
from pipecat.processors.aggregators.openai_llm_context import (
@@ -51,7 +49,10 @@ try:
from anthropic import NOT_GIVEN, AsyncAnthropic, NotGiven
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Anthropic, you need to `pip install pipecat-ai[anthropic]`.")
logger.error(
"In order to use Anthropic, you need to `pip install pipecat-ai[anthropic]`. "
+ "Also, set `ANTHROPIC_API_KEY` environment variable."
)
raise Exception(f"Missing module: {e}")
@@ -119,8 +120,8 @@ class AnthropicLLMService(LLMService):
self,
context: OpenAILLMContext,
*,
user_params: LLMUserAggregatorParams = LLMUserAggregatorParams(),
assistant_params: LLMAssistantAggregatorParams = LLMAssistantAggregatorParams(),
user_kwargs: Mapping[str, Any] = {},
assistant_kwargs: Mapping[str, Any] = {},
) -> AnthropicContextAggregatorPair:
"""Create an instance of AnthropicContextAggregatorPair from an
OpenAILLMContext. Constructor keyword arguments for both the user and
@@ -128,10 +129,12 @@ class AnthropicLLMService(LLMService):
Args:
context (OpenAILLMContext): The LLM context.
user_params (LLMUserAggregatorParams, optional): User aggregator
parameters.
assistant_params (LLMAssistantAggregatorParams, optional): User
aggregator parameters.
user_kwargs (Mapping[str, Any], optional): Additional keyword
arguments for the user context aggregator constructor. Defaults
to an empty mapping.
assistant_kwargs (Mapping[str, Any], optional): Additional keyword
arguments for the assistant context aggregator
constructor. Defaults to an empty mapping.
Returns:
AnthropicContextAggregatorPair: A pair of context aggregators, one
@@ -143,8 +146,8 @@ class AnthropicLLMService(LLMService):
if isinstance(context, OpenAILLMContext):
context = AnthropicLLMContext.from_openai_context(context)
user = AnthropicUserContextAggregator(context, params=user_params)
assistant = AnthropicAssistantContextAggregator(context, params=assistant_params)
user = AnthropicUserContextAggregator(context, **user_kwargs)
assistant = AnthropicAssistantContextAggregator(context, **assistant_kwargs)
return AnthropicContextAggregatorPair(_user=user, _assistant=assistant)
async def _process_context(self, context: OpenAILLMContext):

View File

@@ -231,9 +231,9 @@ class PollyTTSService(TTSService):
yield TTSStartedFrame()
CHUNK_SIZE = 1024
for i in range(0, len(audio_data), CHUNK_SIZE):
chunk = audio_data[i : i + CHUNK_SIZE]
chunk_size = 8192
for i in range(0, len(audio_data), chunk_size):
chunk = audio_data[i : i + chunk_size]
if len(chunk) > 0:
await self.stop_ttfb_metrics()
frame = TTSAudioRawFrame(chunk, self.sample_rate, 1)

Some files were not shown because too many files have changed in this diff Show More