Compare commits
54 Commits
hush/spell
...
hush/custo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
dddfd791e1 | ||
|
|
e721c2086c | ||
|
|
f77d8f0b6f | ||
|
|
9c0beb05cf | ||
|
|
858981c404 | ||
|
|
9eed225aa2 | ||
|
|
9f7371e485 | ||
|
|
004a920920 | ||
|
|
203c5a3a60 | ||
|
|
7f6fb1754b | ||
|
|
a390ce13a4 | ||
|
|
61d31d1c40 | ||
|
|
e872ff943a | ||
|
|
c71005e249 | ||
|
|
6e06bf97c0 | ||
|
|
a80dc94e91 | ||
|
|
3ea9cfd251 | ||
|
|
a80f82cdb6 | ||
|
|
d24bab354f | ||
|
|
53ee3fb64c | ||
|
|
3599761e4e | ||
|
|
c0b3fe3985 | ||
|
|
497d48b6c8 | ||
|
|
e179916c9c | ||
|
|
b0b38beb19 | ||
|
|
8577139d21 | ||
|
|
e2fbbb4b40 | ||
|
|
88ce117e84 | ||
|
|
266537c3f4 | ||
|
|
230d2f80fa | ||
|
|
3f0688aefa | ||
|
|
5be3e6979e | ||
|
|
a458c1e92b | ||
|
|
5bbf1d0209 | ||
|
|
8e36bdbed7 | ||
|
|
cd8bd7f487 | ||
|
|
5fa47b7a5c | ||
|
|
616961b487 | ||
|
|
650d4d9ee2 | ||
|
|
2627cb6bf2 | ||
|
|
0e4115049b | ||
|
|
3ebef9346f | ||
|
|
3e2d21779f | ||
|
|
cfefcac35f | ||
|
|
57b39c084f | ||
|
|
11b6de0900 | ||
|
|
824bc9bf16 | ||
|
|
d0ddef6c12 | ||
|
|
e6325a8229 | ||
|
|
3588b06718 | ||
|
|
73874f6ec0 | ||
|
|
6ab9a8ad7f | ||
|
|
821e303249 | ||
|
|
61cba0136f |
@@ -9,6 +9,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
### Added
|
||||
|
||||
- Added support for Smart Turn Detection via the `turn_analyzer` transport
|
||||
parameter. You can now choose between `SmartTurnAnalyzer()` for remote
|
||||
inference or `LocalCoreMLSmartTurnAnalyzer()` for on-device inference using
|
||||
Core ML.
|
||||
|
||||
- `DeepgramTTSService` accepts `base_url` argument again, allowing you to
|
||||
connect to an on-prem service.
|
||||
|
||||
@@ -55,6 +60,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fixed an issue that would cause TTS websocket-based services to not cleanup
|
||||
resources properly when disconnecting.
|
||||
|
||||
- Fixed a `TavusVideoService` issue that was causing audio choppiness.
|
||||
|
||||
- Fixed an issue in `SmallWebRTCTransport` where an error was thrown if the
|
||||
|
||||
@@ -92,4 +92,8 @@ ASSEMBLYAI_API_KEY=...
|
||||
OPENROUTER_API_KEY=...
|
||||
|
||||
# Piper
|
||||
PIPER_BASE_URL=...
|
||||
PIPER_BASE_URL=...
|
||||
|
||||
# Smart turn
|
||||
LOCAL_SMART_TURN_MODEL_PATH=
|
||||
REMOTE_SMART_TURN_URL=
|
||||
@@ -141,6 +141,7 @@ async def dial(request: RoomRequest, raw_request: Request):
|
||||
"display_name": request.From,
|
||||
"sip_mode": "dial-in",
|
||||
"num_endpoints": 2 if request.call_transfer is not None else 1,
|
||||
"codecs": {"audio": ["OPUS"]},
|
||||
}
|
||||
daily_room_properties["sip"] = sip_config
|
||||
|
||||
|
||||
@@ -103,6 +103,7 @@ export default async function handler(req, res) {
|
||||
display_name: From,
|
||||
sip_mode: 'dial-in',
|
||||
num_endpoints: call_transfer !== null ? 2 : 1,
|
||||
codecs: {"audio": ["OPUS"]},
|
||||
};
|
||||
daily_room_properties.sip = sip_config;
|
||||
}
|
||||
@@ -172,4 +173,4 @@ export const config = {
|
||||
sizeLimit: '1mb',
|
||||
},
|
||||
},
|
||||
};
|
||||
};
|
||||
|
||||
@@ -9,10 +9,11 @@ import os
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.frames.frames import EndFrame, TTSSpeakFrame
|
||||
from pipecat.frames.frames import EndFrame, TranscriptionFrame, TTSSpeakFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineTask
|
||||
from pipecat.processors.frame_processor import FrameProcessor
|
||||
from pipecat.services.cartesia.tts import CartesiaTTSService
|
||||
from pipecat.transports.base_transport import TransportParams
|
||||
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
|
||||
|
||||
@@ -10,7 +10,7 @@ from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import Frame, MetricsFrame
|
||||
from pipecat.frames.frames import Frame, MetricsFrame, TranscriptionFrame, TTSSpeakFrame
|
||||
from pipecat.metrics.metrics import (
|
||||
LLMUsageMetricsData,
|
||||
ProcessingMetricsData,
|
||||
@@ -32,7 +32,30 @@ from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
|
||||
load_dotenv(override=True)
|
||||
|
||||
|
||||
# Custom processor that prints a message if it receives a TranscriptionFrame that says "banana"
|
||||
class BananaProcessor(FrameProcessor):
|
||||
"""A custom processor that listens for transcription frames containing the word 'banana'."""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
# Ensure the super method is called first
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, TranscriptionFrame):
|
||||
logger.debug(f"Received transcription frame: {frame.text}")
|
||||
if "banana" in frame.text.lower():
|
||||
logger.info("---- Received 'banana' in transcription frame")
|
||||
|
||||
# Push the frame after processing
|
||||
await self.push_frame(frame)
|
||||
|
||||
|
||||
class MetricsLogger(FrameProcessor):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
@@ -87,10 +110,13 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection):
|
||||
context = OpenAILLMContext(messages)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
banana = BananaProcessor()
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(),
|
||||
stt,
|
||||
banana,
|
||||
context_aggregator.user(),
|
||||
llm,
|
||||
tts,
|
||||
|
||||
111
examples/foundational/38-smart-turn.py
Normal file
111
examples/foundational/38-smart-turn.py
Normal file
@@ -0,0 +1,111 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import os
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.turn.smart_turn import SmartTurnAnalyzer
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.audio.vad.vad_analyzer import VADParams
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.services.cartesia.tts import CartesiaTTSService
|
||||
from pipecat.services.deepgram.stt import DeepgramSTTService
|
||||
from pipecat.services.openai.llm import OpenAILLMService
|
||||
from pipecat.transports.base_transport import TransportParams
|
||||
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
|
||||
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
|
||||
async def run_bot(webrtc_connection: SmallWebRTCConnection):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
remote_smart_turn_url = os.getenv("REMOTE_SMART_TURN_URL")
|
||||
|
||||
transport = SmallWebRTCTransport(
|
||||
webrtc_connection=webrtc_connection,
|
||||
params=TransportParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
|
||||
vad_audio_passthrough=True,
|
||||
turn_analyzer=SmartTurnAnalyzer(url=remote_smart_turn_url),
|
||||
),
|
||||
)
|
||||
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
|
||||
},
|
||||
]
|
||||
|
||||
context = OpenAILLMContext(messages)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
stt,
|
||||
context_aggregator.user(), # User responses
|
||||
llm, # LLM
|
||||
tts, # TTS
|
||||
transport.output(), # Transport bot output
|
||||
context_aggregator.assistant(), # Assistant spoken responses
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
report_only_initial_ttfb=True,
|
||||
),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info(f"Client disconnected")
|
||||
|
||||
@transport.event_handler("on_client_closed")
|
||||
async def on_client_closed(transport, client):
|
||||
logger.info(f"Client closed connection")
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner(handle_sigint=False)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from run import main
|
||||
|
||||
main()
|
||||
129
examples/foundational/38a-local-smart-turn.py
Normal file
129
examples/foundational/38a-local-smart-turn.py
Normal file
@@ -0,0 +1,129 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import os
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.turn.base_smart_turn import SmartTurnParams
|
||||
from pipecat.audio.turn.local_smart_turn import LocalCoreMLSmartTurnAnalyzer
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.audio.vad.vad_analyzer import VADParams
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.services.cartesia.tts import CartesiaTTSService
|
||||
from pipecat.services.deepgram.stt import DeepgramSTTService
|
||||
from pipecat.services.openai.llm import OpenAILLMService
|
||||
from pipecat.transports.base_transport import TransportParams
|
||||
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
|
||||
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
|
||||
async def run_bot(webrtc_connection: SmallWebRTCConnection):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
# To use this locally, set the environment variable LOCAL_SMART_TURN_MODEL_PATH
|
||||
# to the path where the smart-turn repo is cloned.
|
||||
#
|
||||
# Example setup:
|
||||
#
|
||||
# # Git LFS (Large File Storage)
|
||||
# brew install git-lfs
|
||||
# # Hugging Face uses LFS to store large model files, including .mlpackage
|
||||
# git lfs install
|
||||
# # Clone the repo with the smart_turn_classifier.mlpackage
|
||||
# git clone https://huggingface.co/pipecat-ai/smart-turn
|
||||
#
|
||||
# Then set the env variable:
|
||||
# export LOCAL_SMART_TURN_MODEL_PATH=./smart-turn
|
||||
# or add it to your .env file
|
||||
smart_turn_model_path = os.getenv("LOCAL_SMART_TURN_MODEL_PATH")
|
||||
|
||||
transport = SmallWebRTCTransport(
|
||||
webrtc_connection=webrtc_connection,
|
||||
params=TransportParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
|
||||
vad_audio_passthrough=True,
|
||||
turn_analyzer=LocalCoreMLSmartTurnAnalyzer(
|
||||
smart_turn_model_path=smart_turn_model_path, params=SmartTurnParams()
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
|
||||
},
|
||||
]
|
||||
|
||||
context = OpenAILLMContext(messages)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
stt,
|
||||
context_aggregator.user(), # User responses
|
||||
llm, # LLM
|
||||
tts, # TTS
|
||||
transport.output(), # Transport bot output
|
||||
context_aggregator.assistant(), # Assistant spoken responses
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
report_only_initial_ttfb=True,
|
||||
),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info(f"Client disconnected")
|
||||
|
||||
@transport.event_handler("on_client_closed")
|
||||
async def on_client_closed(transport, client):
|
||||
logger.info(f"Client closed connection")
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner(handle_sigint=False)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from run import main
|
||||
|
||||
main()
|
||||
61
examples/p2p-webrtc/daily-interop-bridge/README.md
Normal file
61
examples/p2p-webrtc/daily-interop-bridge/README.md
Normal file
@@ -0,0 +1,61 @@
|
||||
# SmallWebRTC and Daily
|
||||
|
||||
A Pipecat example demonstrating how to interoperate audio and video between `SmallWebRTCTransport` and `DailyTransport`.
|
||||
|
||||
## 🚀 Quick Start
|
||||
|
||||
### 1️⃣ Start the Bot Server
|
||||
|
||||
#### 🔧 Set Up the Environment
|
||||
1. Create and activate a virtual environment:
|
||||
```bash
|
||||
python3 -m venv venv
|
||||
source venv/bin/activate # On Windows: venv\Scripts\activate
|
||||
```
|
||||
|
||||
2. Install dependencies:
|
||||
```bash
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
|
||||
3. Configure environment variables:
|
||||
- Copy `env.example` to `.env`
|
||||
```bash
|
||||
cp env.example .env
|
||||
```
|
||||
- Add your API keys
|
||||
|
||||
#### ▶️ Run the Server
|
||||
```bash
|
||||
python server.py
|
||||
```
|
||||
|
||||
### 1️⃣ Connect the first client using Daily Prebuilt
|
||||
|
||||
- Open your browser and navigate to the same URL that you configured inside your `.env` file:
|
||||
- `DAILY_SAMPLE_ROOM_URL`
|
||||
|
||||
### 2️⃣ Connect the second client using SmallWebRTC Prebuilt UI
|
||||
|
||||
- Open your browser and navigate to:
|
||||
👉 http://localhost:7860
|
||||
- (Or use your custom port, if configured)
|
||||
|
||||
## ⚠️ Important Note
|
||||
Ensure the bot server is running before using any client implementations.
|
||||
|
||||
## 📌 Requirements
|
||||
|
||||
- Python **3.10+**
|
||||
- Node.js **16+** (for JavaScript components)
|
||||
- Google API Key
|
||||
- Modern web browser with WebRTC support
|
||||
|
||||
---
|
||||
|
||||
### 💡 Notes
|
||||
- Ensure all dependencies are installed before running the server.
|
||||
- Check the `.env` file for missing configurations.
|
||||
- WebRTC requires a secure environment (HTTPS) for full functionality in production.
|
||||
|
||||
Happy coding! 🎉
|
||||
128
examples/p2p-webrtc/daily-interop-bridge/bot.py
Normal file
128
examples/p2p-webrtc/daily-interop-bridge/bot.py
Normal file
@@ -0,0 +1,128 @@
|
||||
#
|
||||
# Copyright (c) 2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
import os
|
||||
import sys
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
InputAudioRawFrame,
|
||||
InputImageRawFrame,
|
||||
OutputAudioRawFrame,
|
||||
OutputImageRawFrame,
|
||||
)
|
||||
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.frame_processor import Frame, FrameDirection, FrameProcessor
|
||||
from pipecat.transports.base_transport import TransportParams
|
||||
from pipecat.transports.network.small_webrtc import SmallWebRTCTransport
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
logger.remove(0)
|
||||
logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
|
||||
class MirrorProcessor(FrameProcessor):
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, InputAudioRawFrame):
|
||||
await self.push_frame(
|
||||
OutputAudioRawFrame(
|
||||
audio=frame.audio,
|
||||
sample_rate=frame.sample_rate,
|
||||
num_channels=frame.num_channels,
|
||||
)
|
||||
)
|
||||
elif isinstance(frame, InputImageRawFrame):
|
||||
await self.push_frame(
|
||||
OutputImageRawFrame(image=frame.image, size=frame.size, format=frame.format)
|
||||
)
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
|
||||
async def run_bot(webrtc_connection):
|
||||
pipecat_transport = SmallWebRTCTransport(
|
||||
webrtc_connection=webrtc_connection,
|
||||
params=TransportParams(
|
||||
camera_in_enabled=True,
|
||||
camera_out_enabled=True,
|
||||
camera_out_is_live=True,
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
camera_out_width=1280,
|
||||
camera_out_height=720,
|
||||
vad_enabled=False,
|
||||
),
|
||||
)
|
||||
|
||||
room_url = os.getenv("DAILY_SAMPLE_ROOM_URL", "")
|
||||
daily_transport = DailyTransport(
|
||||
room_url,
|
||||
None,
|
||||
"SmallWebRTC",
|
||||
params=DailyParams(
|
||||
camera_in_enabled=True,
|
||||
camera_out_enabled=True,
|
||||
camera_out_is_live=True,
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
camera_out_width=1280,
|
||||
camera_out_height=720,
|
||||
vad_enabled=False,
|
||||
),
|
||||
)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
ParallelPipeline(
|
||||
[
|
||||
daily_transport.input(),
|
||||
MirrorProcessor(),
|
||||
pipecat_transport.output(),
|
||||
],
|
||||
[
|
||||
pipecat_transport.input(),
|
||||
MirrorProcessor(),
|
||||
daily_transport.output(),
|
||||
],
|
||||
)
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
allow_interruptions=False,
|
||||
),
|
||||
)
|
||||
|
||||
@daily_transport.event_handler("on_participant_joined")
|
||||
async def on_participant_joined(transport, participant):
|
||||
await transport.capture_participant_video(participant["id"])
|
||||
|
||||
@pipecat_transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info("Pipecat Client connected")
|
||||
|
||||
@pipecat_transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info("Pipecat Client disconnected")
|
||||
|
||||
@pipecat_transport.event_handler("on_client_closed")
|
||||
async def on_client_closed(transport, client):
|
||||
logger.info("Pipecat Client closed")
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner(handle_sigint=False)
|
||||
|
||||
await runner.run(task)
|
||||
2
examples/p2p-webrtc/daily-interop-bridge/env.example
Normal file
2
examples/p2p-webrtc/daily-interop-bridge/env.example
Normal file
@@ -0,0 +1,2 @@
|
||||
DAILY_API_KEY=
|
||||
DAILY_SAMPLE_ROOM_URL=
|
||||
@@ -0,0 +1,5 @@
|
||||
python-dotenv
|
||||
fastapi[all]
|
||||
uvicorn
|
||||
aiortc
|
||||
pipecat-ai[silero, webrtc, daily]
|
||||
89
examples/p2p-webrtc/daily-interop-bridge/server.py
Normal file
89
examples/p2p-webrtc/daily-interop-bridge/server.py
Normal file
@@ -0,0 +1,89 @@
|
||||
import argparse
|
||||
import asyncio
|
||||
import logging
|
||||
from contextlib import asynccontextmanager
|
||||
from typing import Dict
|
||||
|
||||
import uvicorn
|
||||
from bot import run_bot
|
||||
from dotenv import load_dotenv
|
||||
from fastapi import BackgroundTasks, FastAPI
|
||||
from fastapi.responses import RedirectResponse
|
||||
from pipecat_ai_small_webrtc_prebuilt.frontend import SmallWebRTCPrebuiltUI
|
||||
|
||||
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
|
||||
|
||||
# Load environment variables
|
||||
load_dotenv(override=True)
|
||||
|
||||
logger = logging.getLogger("pc")
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
# Store connections by pc_id
|
||||
pcs_map: Dict[str, SmallWebRTCConnection] = {}
|
||||
|
||||
ice_servers = ["stun:stun.l.google.com:19302"]
|
||||
|
||||
# Mount the frontend at /
|
||||
app.mount("/prebuilt", SmallWebRTCPrebuiltUI)
|
||||
|
||||
|
||||
@app.get("/", include_in_schema=False)
|
||||
async def root_redirect():
|
||||
return RedirectResponse(url="/prebuilt/")
|
||||
|
||||
|
||||
@app.post("/api/offer")
|
||||
async def offer(request: dict, background_tasks: BackgroundTasks):
|
||||
pc_id = request.get("pc_id")
|
||||
|
||||
if pc_id and pc_id in pcs_map:
|
||||
pipecat_connection = pcs_map[pc_id]
|
||||
logger.info(f"Reusing existing connection for pc_id: {pc_id}")
|
||||
await pipecat_connection.renegotiate(
|
||||
sdp=request["sdp"], type=request["type"], restart_pc=request.get("restart_pc", False)
|
||||
)
|
||||
else:
|
||||
pipecat_connection = SmallWebRTCConnection(ice_servers)
|
||||
await pipecat_connection.initialize(sdp=request["sdp"], type=request["type"])
|
||||
|
||||
@pipecat_connection.event_handler("closed")
|
||||
async def handle_disconnected(webrtc_connection: SmallWebRTCConnection):
|
||||
logger.info(f"Discarding peer connection for pc_id: {webrtc_connection.pc_id}")
|
||||
pcs_map.pop(webrtc_connection.pc_id, None)
|
||||
|
||||
background_tasks.add_task(run_bot, pipecat_connection)
|
||||
|
||||
answer = pipecat_connection.get_answer()
|
||||
# Updating the peer connection inside the map
|
||||
pcs_map[answer["pc_id"]] = pipecat_connection
|
||||
|
||||
return answer
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
yield # Run app
|
||||
coros = [pc.close() for pc in pcs_map.values()]
|
||||
await asyncio.gather(*coros)
|
||||
pcs_map.clear()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="WebRTC demo")
|
||||
parser.add_argument(
|
||||
"--host", default="localhost", help="Host for HTTP server (default: localhost)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--port", type=int, default=7860, help="Port for HTTP server (default: 7860)"
|
||||
)
|
||||
parser.add_argument("--verbose", "-v", action="count")
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.verbose:
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
else:
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
uvicorn.run(app, host=args.host, port=args.port)
|
||||
@@ -79,6 +79,8 @@ qwen = []
|
||||
rime = [ "websockets~=13.1" ]
|
||||
riva = [ "nvidia-riva-client~=2.19.0" ]
|
||||
sentry = [ "sentry-sdk~=2.23.1" ]
|
||||
local-smart-turn = [ "coremltools>=8.0", "transformers", "torch==2.5.0", "torchaudio==2.5.0" ]
|
||||
remote-smart-turn = []
|
||||
silero = [ "onnxruntime~=1.20.1" ]
|
||||
simli = [ "simli-ai~=0.1.10"]
|
||||
soundfile = [ "soundfile~=0.13.0" ]
|
||||
|
||||
0
src/pipecat/audio/turn/__init__.py
Normal file
0
src/pipecat/audio/turn/__init__.py
Normal file
168
src/pipecat/audio/turn/base_smart_turn.py
Normal file
168
src/pipecat/audio/turn/base_smart_turn.py
Normal file
@@ -0,0 +1,168 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import time
|
||||
from abc import abstractmethod
|
||||
from typing import Dict, Optional
|
||||
|
||||
import numpy as np
|
||||
from loguru import logger
|
||||
from pydantic import BaseModel
|
||||
|
||||
from pipecat.audio.turn.base_turn_analyzer import BaseTurnAnalyzer, EndOfTurnState
|
||||
|
||||
# Default timing parameters
|
||||
STOP_SECS = 3
|
||||
PRE_SPEECH_MS = 0
|
||||
MAX_DURATION_SECONDS = 8 # Max allowed segment duration
|
||||
USE_ONLY_LAST_VAD_SEGMENT = True
|
||||
|
||||
|
||||
class SmartTurnParams(BaseModel):
|
||||
stop_secs: float = STOP_SECS
|
||||
pre_speech_ms: float = PRE_SPEECH_MS
|
||||
max_duration_secs: float = MAX_DURATION_SECONDS
|
||||
# not exposing this for now yet until the model can handle it.
|
||||
# use_only_last_vad_segment: bool = USE_ONLY_LAST_VAD_SEGMENT
|
||||
|
||||
|
||||
class BaseSmartTurn(BaseTurnAnalyzer):
|
||||
def __init__(
|
||||
self, *, sample_rate: Optional[int] = None, params: SmartTurnParams = SmartTurnParams()
|
||||
):
|
||||
super().__init__(sample_rate=sample_rate)
|
||||
self._params = params
|
||||
# Configuration
|
||||
self._stop_ms = self._params.stop_secs * 1000 # silence threshold in ms
|
||||
# Inference state
|
||||
self._audio_buffer = []
|
||||
self._speech_triggered = False
|
||||
self._silence_ms = 0
|
||||
self._speech_start_time = None
|
||||
|
||||
@property
|
||||
def speech_triggered(self) -> bool:
|
||||
return self._speech_triggered
|
||||
|
||||
def append_audio(self, buffer: bytes, is_speech: bool) -> EndOfTurnState:
|
||||
# Convert raw audio to float32 format and append to the buffer
|
||||
audio_int16 = np.frombuffer(buffer, dtype=np.int16)
|
||||
audio_float32 = np.frombuffer(audio_int16, dtype=np.int16).astype(np.float32) / 32768.0
|
||||
self._audio_buffer.append((time.time(), audio_float32))
|
||||
|
||||
state = EndOfTurnState.INCOMPLETE
|
||||
|
||||
if is_speech:
|
||||
# Reset silence tracking on speech
|
||||
self._silence_ms = 0
|
||||
self._speech_triggered = True
|
||||
if self._speech_start_time is None:
|
||||
self._speech_start_time = time.time()
|
||||
logger.debug(f"Speech started at {self._speech_start_time}")
|
||||
else:
|
||||
if self._speech_triggered:
|
||||
chunk_duration_ms = len(audio_int16) / (self._sample_rate / 1000)
|
||||
self._silence_ms += chunk_duration_ms
|
||||
# If silence exceeds threshold, mark end of turn
|
||||
if self._silence_ms >= self._stop_ms:
|
||||
logger.debug(
|
||||
f"End of Turn complete due to stop_secs. Silence in ms: {self._silence_ms}"
|
||||
)
|
||||
state = EndOfTurnState.COMPLETE
|
||||
self._clear(state)
|
||||
else:
|
||||
# Trim buffer to prevent unbounded growth before speech
|
||||
max_buffer_time = (
|
||||
(self._params.pre_speech_ms / 1000)
|
||||
+ self._params.stop_secs
|
||||
+ self._params.max_duration_secs
|
||||
)
|
||||
while (
|
||||
self._audio_buffer and self._audio_buffer[0][0] < time.time() - max_buffer_time
|
||||
):
|
||||
self._audio_buffer.pop(0)
|
||||
|
||||
return state
|
||||
|
||||
def analyze_end_of_turn(self) -> EndOfTurnState:
|
||||
logger.debug("Analyzing End of Turn...")
|
||||
state = self._process_speech_segment(self._audio_buffer)
|
||||
if state == EndOfTurnState.COMPLETE or USE_ONLY_LAST_VAD_SEGMENT:
|
||||
self._clear(state)
|
||||
logger.debug(f"End of Turn result: {state}")
|
||||
return state
|
||||
|
||||
def _clear(self, turn_state: EndOfTurnState):
|
||||
# Reset internal state for next turn
|
||||
logger.debug("Clearing audio buffer...")
|
||||
# If the state is still incomplete, keep the _speech_triggered as True
|
||||
self._speech_triggered = turn_state == EndOfTurnState.INCOMPLETE
|
||||
self._audio_buffer = []
|
||||
self._speech_start_time = None
|
||||
self._silence_ms = 0
|
||||
|
||||
def _process_speech_segment(self, audio_buffer) -> EndOfTurnState:
|
||||
state = EndOfTurnState.INCOMPLETE
|
||||
|
||||
if not audio_buffer:
|
||||
return state
|
||||
|
||||
# Extract recent audio segment for prediction
|
||||
start_time = self._speech_start_time - (self._params.pre_speech_ms / 1000)
|
||||
start_index = 0
|
||||
for i, (t, _) in enumerate(audio_buffer):
|
||||
if t >= start_time:
|
||||
start_index = i
|
||||
break
|
||||
|
||||
end_index = len(audio_buffer) - 1
|
||||
|
||||
# Extract the audio segment
|
||||
segment_audio_chunks = [chunk for _, chunk in audio_buffer[start_index : end_index + 1]]
|
||||
segment_audio = np.concatenate(segment_audio_chunks)
|
||||
|
||||
logger.debug(f"Segment audio chunks after start index: {len(segment_audio)}")
|
||||
|
||||
# Limit maximum duration
|
||||
max_samples = int(self._params.max_duration_secs * self.sample_rate)
|
||||
if len(segment_audio) > max_samples:
|
||||
# slices the array to keep the last max_samples samples, discarding the earlier part.
|
||||
segment_audio = segment_audio[-max_samples:]
|
||||
|
||||
logger.debug(f"Segment audio chunks after limiting duration: {len(segment_audio)}")
|
||||
|
||||
if len(segment_audio) > 0:
|
||||
start_time = time.perf_counter()
|
||||
result = self._predict_endpoint(segment_audio)
|
||||
state = (
|
||||
EndOfTurnState.COMPLETE if result["prediction"] == 1 else EndOfTurnState.INCOMPLETE
|
||||
)
|
||||
end_time = time.perf_counter()
|
||||
|
||||
logger.debug("--------")
|
||||
logger.debug(f"Prediction: {'Complete' if result['prediction'] == 1 else 'Incomplete'}")
|
||||
logger.debug(f"Probability of complete: {result['probability']:.4f}")
|
||||
logger.debug(f"Prediction took {(end_time - start_time) * 1000:.2f}ms seconds")
|
||||
else:
|
||||
logger.debug(f"params: {self._params}, stop_ms: {self._stop_ms}")
|
||||
logger.debug("Captured empty audio segment, skipping prediction.")
|
||||
|
||||
return state
|
||||
|
||||
@abstractmethod
|
||||
def _predict_endpoint(self, buffer: np.ndarray) -> Dict[str, any]:
|
||||
"""
|
||||
Abstract method to predict if a turn has ended based on audio.
|
||||
|
||||
Args:
|
||||
buffer: Float32 numpy array of audio samples at 16kHz.
|
||||
|
||||
Returns:
|
||||
Dictionary with:
|
||||
- prediction: 1 if turn is complete, else 0
|
||||
- probability: Confidence of the prediction
|
||||
"""
|
||||
pass
|
||||
81
src/pipecat/audio/turn/base_turn_analyzer.py
Normal file
81
src/pipecat/audio/turn/base_turn_analyzer.py
Normal file
@@ -0,0 +1,81 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from enum import Enum
|
||||
from typing import Optional
|
||||
|
||||
|
||||
class EndOfTurnState(Enum):
|
||||
COMPLETE = 1
|
||||
INCOMPLETE = 2
|
||||
|
||||
|
||||
class BaseTurnAnalyzer(ABC):
|
||||
"""
|
||||
Abstract base class for analyzing user end of turn.
|
||||
"""
|
||||
|
||||
def __init__(self, *, sample_rate: Optional[int] = None):
|
||||
self._init_sample_rate = sample_rate
|
||||
self._sample_rate = 0
|
||||
|
||||
@property
|
||||
def sample_rate(self) -> int:
|
||||
"""
|
||||
Returns the current sample rate.
|
||||
|
||||
Returns:
|
||||
int: The effective sample rate for audio processing.
|
||||
"""
|
||||
return self._sample_rate
|
||||
|
||||
def set_sample_rate(self, sample_rate: int):
|
||||
"""
|
||||
Sets the sample rate for audio processing.
|
||||
|
||||
If the initial sample rate was provided, it will use that; otherwise, it sets to
|
||||
the provided sample rate.
|
||||
|
||||
Args:
|
||||
sample_rate (int): The sample rate to set.
|
||||
"""
|
||||
self._sample_rate = self._init_sample_rate or sample_rate
|
||||
|
||||
@property
|
||||
@abstractmethod
|
||||
def speech_triggered(self) -> bool:
|
||||
"""
|
||||
Determines if speech has been detected.
|
||||
|
||||
Returns:
|
||||
bool: True if speech is triggered, otherwise False.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def append_audio(self, buffer: bytes, is_speech: bool) -> EndOfTurnState:
|
||||
"""
|
||||
Appends audio data for analysis.
|
||||
|
||||
Args:
|
||||
buffer (bytes): The audio data to append.
|
||||
is_speech (bool): Indicates whether the appended audio is speech or not.
|
||||
|
||||
Returns:
|
||||
EndOfTurnState: The resulting state after appending the audio.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def analyze_end_of_turn(self) -> EndOfTurnState:
|
||||
"""
|
||||
Analyzes if an end of turn has occurred based on the audio input.
|
||||
|
||||
Returns:
|
||||
EndOfTurnState: The result of the end of turn analysis.
|
||||
"""
|
||||
pass
|
||||
65
src/pipecat/audio/turn/local_smart_turn.py
Normal file
65
src/pipecat/audio/turn/local_smart_turn.py
Normal file
@@ -0,0 +1,65 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
|
||||
import os
|
||||
from typing import Dict
|
||||
|
||||
import numpy as np
|
||||
import torch
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.turn.base_smart_turn import BaseSmartTurn
|
||||
|
||||
try:
|
||||
import coremltools as ct
|
||||
from transformers import AutoFeatureExtractor
|
||||
except ModuleNotFoundError as e:
|
||||
logger.error(f"Exception: {e}")
|
||||
logger.error(
|
||||
"In order to use the LocalSmartTurnAnalyzer, you need to `pip install pipecat-ai[local-smart-turn]`."
|
||||
)
|
||||
raise Exception(f"Missing module: {e}")
|
||||
|
||||
|
||||
class LocalCoreMLSmartTurnAnalyzer(BaseSmartTurn):
|
||||
def __init__(self, smart_turn_model_path: str, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
|
||||
if not smart_turn_model_path:
|
||||
logger.error("smart_turn_model_path is not set.")
|
||||
raise Exception("smart_turn_model_path must be provided.")
|
||||
|
||||
core_ml_model_path = f"{smart_turn_model_path}/coreml/smart_turn_classifier.mlpackage"
|
||||
|
||||
logger.debug("Loading Local Smart Turn model...")
|
||||
# Only load the processor, not the torch model
|
||||
self._turn_processor = AutoFeatureExtractor.from_pretrained(smart_turn_model_path)
|
||||
self._turn_model = ct.models.MLModel(core_ml_model_path)
|
||||
logger.debug("Loaded Local Smart Turn")
|
||||
|
||||
def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, any]:
|
||||
inputs = self._turn_processor(
|
||||
audio_array,
|
||||
sampling_rate=16000,
|
||||
padding="max_length",
|
||||
truncation=True,
|
||||
max_length=800, # Maximum length as specified in training
|
||||
return_attention_mask=True,
|
||||
return_tensors="pt",
|
||||
)
|
||||
|
||||
output = self._turn_model.predict(dict(inputs))
|
||||
logits = output["logits"] # Core ML returns numpy array
|
||||
logits_tensor = torch.tensor(logits)
|
||||
probabilities = torch.nn.functional.softmax(logits_tensor, dim=1)
|
||||
completion_prob = probabilities[0, 1].item() # Probability of class 1 (Complete)
|
||||
prediction = 1 if completion_prob > 0.5 else 0
|
||||
|
||||
return {
|
||||
"prediction": prediction,
|
||||
"probability": completion_prob,
|
||||
}
|
||||
75
src/pipecat/audio/turn/smart_turn.py
Normal file
75
src/pipecat/audio/turn/smart_turn.py
Normal file
@@ -0,0 +1,75 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
|
||||
import io
|
||||
import os
|
||||
from typing import Dict
|
||||
|
||||
import numpy as np
|
||||
import requests
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.turn.base_smart_turn import BaseSmartTurn
|
||||
|
||||
|
||||
class SmartTurnAnalyzer(BaseSmartTurn):
|
||||
def __init__(self, url: str, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self.remote_smart_turn_url = url
|
||||
|
||||
if not self.remote_smart_turn_url:
|
||||
logger.error("remote_smart_turn_url is not set.")
|
||||
raise Exception("remote_smart_turn_url must be provided.")
|
||||
|
||||
# Use a session to reuse connections (keep-alive)
|
||||
self.session = requests.Session()
|
||||
self.session.headers.update({"Connection": "keep-alive"})
|
||||
|
||||
def _serialize_array(self, audio_array: np.ndarray) -> bytes:
|
||||
logger.trace("Serializing NumPy array to bytes...")
|
||||
buffer = io.BytesIO()
|
||||
np.save(buffer, audio_array)
|
||||
serialized_bytes = buffer.getvalue()
|
||||
logger.trace(f"Serialized size: {len(serialized_bytes)} bytes")
|
||||
return serialized_bytes
|
||||
|
||||
def _send_raw_request(self, data_bytes: bytes):
|
||||
headers = {"Content-Type": "application/octet-stream"}
|
||||
logger.trace(
|
||||
f"Sending {len(data_bytes)} bytes as raw body to {self.remote_smart_turn_url}..."
|
||||
)
|
||||
try:
|
||||
response = self.session.post(
|
||||
self.remote_smart_turn_url,
|
||||
data=data_bytes,
|
||||
headers=headers,
|
||||
timeout=60,
|
||||
)
|
||||
|
||||
logger.trace("\n--- Response ---")
|
||||
logger.trace(f"Status Code: {response.status_code}")
|
||||
|
||||
if response.ok:
|
||||
try:
|
||||
logger.trace("Response JSON:")
|
||||
logger.trace(response.json())
|
||||
return response.json()
|
||||
except requests.exceptions.JSONDecodeError:
|
||||
logger.trace("Response Content (non-JSON):")
|
||||
logger.trace(response.text)
|
||||
else:
|
||||
logger.trace("Response Content (Error):")
|
||||
logger.trace(response.text)
|
||||
response.raise_for_status()
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
logger.error(f"Failed to send raw request to Daily Smart Turn: {e}")
|
||||
raise Exception("Failed to send raw request to Daily Smart Turn.")
|
||||
|
||||
def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, any]:
|
||||
serialized_array = self._serialize_array(audio_array)
|
||||
return self._send_raw_request(serialized_array)
|
||||
1
src/pipecat/py.typed
Normal file
1
src/pipecat/py.typed
Normal file
@@ -0,0 +1 @@
|
||||
|
||||
@@ -185,7 +185,8 @@ class CartesiaTTSService(AudioContextWordTTSService):
|
||||
|
||||
async def _connect(self):
|
||||
await self._connect_websocket()
|
||||
if not self._receive_task:
|
||||
|
||||
if self._websocket and not self._receive_task:
|
||||
self._receive_task = self.create_task(self._receive_task_handler(self._report_error))
|
||||
|
||||
async def _disconnect(self):
|
||||
@@ -197,7 +198,7 @@ class CartesiaTTSService(AudioContextWordTTSService):
|
||||
|
||||
async def _connect_websocket(self):
|
||||
try:
|
||||
if self._websocket:
|
||||
if self._websocket and self._websocket.open:
|
||||
return
|
||||
logger.debug("Connecting to Cartesia")
|
||||
self._websocket = await websockets.connect(
|
||||
@@ -215,11 +216,11 @@ class CartesiaTTSService(AudioContextWordTTSService):
|
||||
if self._websocket:
|
||||
logger.debug("Disconnecting from Cartesia")
|
||||
await self._websocket.close()
|
||||
self._websocket = None
|
||||
|
||||
self._context_id = None
|
||||
except Exception as e:
|
||||
logger.error(f"{self} error closing websocket: {e}")
|
||||
finally:
|
||||
self._context_id = None
|
||||
self._websocket = None
|
||||
|
||||
def _get_websocket(self):
|
||||
if self._websocket:
|
||||
@@ -279,7 +280,7 @@ class CartesiaTTSService(AudioContextWordTTSService):
|
||||
logger.debug(f"{self}: Generating TTS [{text}]")
|
||||
|
||||
try:
|
||||
if not self._websocket:
|
||||
if not self._websocket or self._websocket.closed:
|
||||
await self._connect()
|
||||
|
||||
if not self._context_id:
|
||||
|
||||
@@ -309,10 +309,10 @@ class ElevenLabsTTSService(InterruptibleWordTTSService):
|
||||
async def _connect(self):
|
||||
await self._connect_websocket()
|
||||
|
||||
if not self._receive_task:
|
||||
if self._websocket and not self._receive_task:
|
||||
self._receive_task = self.create_task(self._receive_task_handler(self._report_error))
|
||||
|
||||
if not self._keepalive_task:
|
||||
if self._websocket and not self._keepalive_task:
|
||||
self._keepalive_task = self.create_task(self._keepalive_task_handler())
|
||||
|
||||
async def _disconnect(self):
|
||||
@@ -328,7 +328,7 @@ class ElevenLabsTTSService(InterruptibleWordTTSService):
|
||||
|
||||
async def _connect_websocket(self):
|
||||
try:
|
||||
if self._websocket:
|
||||
if self._websocket and self._websocket.open:
|
||||
return
|
||||
|
||||
logger.debug("Connecting to ElevenLabs")
|
||||
@@ -375,11 +375,11 @@ class ElevenLabsTTSService(InterruptibleWordTTSService):
|
||||
logger.debug("Disconnecting from ElevenLabs")
|
||||
await self._websocket.send(json.dumps({"text": ""}))
|
||||
await self._websocket.close()
|
||||
self._websocket = None
|
||||
|
||||
self._started = False
|
||||
except Exception as e:
|
||||
logger.error(f"{self} error closing websocket: {e}")
|
||||
finally:
|
||||
self._started = False
|
||||
self._websocket = None
|
||||
|
||||
def _get_websocket(self):
|
||||
if self._websocket:
|
||||
@@ -419,7 +419,7 @@ class ElevenLabsTTSService(InterruptibleWordTTSService):
|
||||
logger.debug(f"{self}: Generating TTS [{text}]")
|
||||
|
||||
try:
|
||||
if not self._websocket:
|
||||
if not self._websocket or self._websocket.closed:
|
||||
await self._connect()
|
||||
|
||||
try:
|
||||
|
||||
@@ -104,7 +104,8 @@ class FishAudioTTSService(InterruptibleTTSService):
|
||||
|
||||
async def _connect(self):
|
||||
await self._connect_websocket()
|
||||
if not self._receive_task:
|
||||
|
||||
if self._websocket and not self._receive_task:
|
||||
self._receive_task = self.create_task(self._receive_task_handler(self._report_error))
|
||||
|
||||
async def _disconnect(self):
|
||||
@@ -116,7 +117,7 @@ class FishAudioTTSService(InterruptibleTTSService):
|
||||
|
||||
async def _connect_websocket(self):
|
||||
try:
|
||||
if self._websocket:
|
||||
if self._websocket and self._websocket.open:
|
||||
return
|
||||
|
||||
logger.debug("Connecting to Fish Audio")
|
||||
@@ -141,16 +142,17 @@ class FishAudioTTSService(InterruptibleTTSService):
|
||||
stop_message = {"event": "stop"}
|
||||
await self._websocket.send(ormsgpack.packb(stop_message))
|
||||
await self._websocket.close()
|
||||
self._websocket = None
|
||||
self._request_id = None
|
||||
self._started = False
|
||||
except Exception as e:
|
||||
logger.error(f"Error closing websocket: {e}")
|
||||
finally:
|
||||
self._request_id = None
|
||||
self._started = False
|
||||
self._websocket = None
|
||||
|
||||
async def flush_audio(self):
|
||||
"""Flush any buffered audio by sending a flush event to Fish Audio."""
|
||||
logger.trace(f"{self}: Flushing audio buffers")
|
||||
if not self._websocket:
|
||||
if not self._websocket or self._websocket.closed:
|
||||
return
|
||||
flush_message = {"event": "flush"}
|
||||
await self._get_websocket().send(ormsgpack.packb(flush_message))
|
||||
|
||||
@@ -285,7 +285,7 @@ class GladiaSTTService(STTService):
|
||||
settings = self._prepare_settings()
|
||||
response = await self._setup_gladia(settings)
|
||||
self._websocket = await websockets.connect(response["url"])
|
||||
if not self._receive_task:
|
||||
if self._websocket and not self._receive_task:
|
||||
self._receive_task = self.create_task(self._receive_task_handler())
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
|
||||
@@ -109,7 +109,7 @@ class LmntTTSService(InterruptibleTTSService):
|
||||
async def _connect(self):
|
||||
await self._connect_websocket()
|
||||
|
||||
if not self._receive_task:
|
||||
if self._websocket and not self._receive_task:
|
||||
self._receive_task = self.create_task(self._receive_task_handler(self._report_error))
|
||||
|
||||
async def _disconnect(self):
|
||||
@@ -122,7 +122,7 @@ class LmntTTSService(InterruptibleTTSService):
|
||||
async def _connect_websocket(self):
|
||||
"""Connect to LMNT websocket."""
|
||||
try:
|
||||
if self._websocket:
|
||||
if self._websocket and self._websocket.open:
|
||||
return
|
||||
|
||||
logger.debug("Connecting to LMNT")
|
||||
@@ -158,11 +158,11 @@ class LmntTTSService(InterruptibleTTSService):
|
||||
# errors on the websocket, so we just skip it for now.
|
||||
# await self._websocket.send(json.dumps({"eof": True}))
|
||||
await self._websocket.close()
|
||||
self._websocket = None
|
||||
|
||||
self._started = False
|
||||
except Exception as e:
|
||||
logger.error(f"{self} error closing websocket: {e}")
|
||||
finally:
|
||||
self._started = False
|
||||
self._websocket = None
|
||||
|
||||
def _get_websocket(self):
|
||||
if self._websocket:
|
||||
@@ -170,7 +170,7 @@ class LmntTTSService(InterruptibleTTSService):
|
||||
raise Exception("Websocket not connected")
|
||||
|
||||
async def flush_audio(self):
|
||||
if not self._websocket:
|
||||
if not self._websocket or self._websocket.closed:
|
||||
return
|
||||
await self._get_websocket().send(json.dumps({"flush": True}))
|
||||
|
||||
@@ -203,7 +203,7 @@ class LmntTTSService(InterruptibleTTSService):
|
||||
logger.debug(f"{self}: Generating TTS [{text}]")
|
||||
|
||||
try:
|
||||
if not self._websocket:
|
||||
if not self._websocket or self._websocket.closed:
|
||||
await self._connect()
|
||||
|
||||
try:
|
||||
|
||||
@@ -106,6 +106,9 @@ class NeuphonicTTSService(InterruptibleTTSService):
|
||||
self._started = False
|
||||
self._cumulative_time = 0
|
||||
|
||||
self._receive_task = None
|
||||
self._keepalive_task = None
|
||||
|
||||
def can_generate_metrics(self) -> bool:
|
||||
return True
|
||||
|
||||
@@ -159,8 +162,11 @@ class NeuphonicTTSService(InterruptibleTTSService):
|
||||
async def _connect(self):
|
||||
await self._connect_websocket()
|
||||
|
||||
self._receive_task = self.create_task(self._receive_task_handler(self._report_error))
|
||||
self._keepalive_task = self.create_task(self._keepalive_task_handler())
|
||||
if self._websocket and not self._receive_task:
|
||||
self._receive_task = self.create_task(self._receive_task_handler(self._report_error))
|
||||
|
||||
if self._websocket and not self._keepalive_task:
|
||||
self._keepalive_task = self.create_task(self._keepalive_task_handler())
|
||||
|
||||
async def _disconnect(self):
|
||||
if self._receive_task:
|
||||
@@ -175,6 +181,9 @@ class NeuphonicTTSService(InterruptibleTTSService):
|
||||
|
||||
async def _connect_websocket(self):
|
||||
try:
|
||||
if self._websocket and self._websocket.open:
|
||||
return
|
||||
|
||||
logger.debug("Connecting to Neuphonic")
|
||||
|
||||
tts_config = {
|
||||
@@ -190,7 +199,6 @@ class NeuphonicTTSService(InterruptibleTTSService):
|
||||
url = f"{self._url}/speak/{self._settings['lang_code']}?{'&'.join(query_params)}"
|
||||
|
||||
self._websocket = await websockets.connect(url)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"{self} initialization error: {e}")
|
||||
self._websocket = None
|
||||
@@ -203,11 +211,11 @@ class NeuphonicTTSService(InterruptibleTTSService):
|
||||
if self._websocket:
|
||||
logger.debug("Disconnecting from Neuphonic")
|
||||
await self._websocket.close()
|
||||
self._websocket = None
|
||||
|
||||
self._started = False
|
||||
except Exception as e:
|
||||
logger.error(f"{self} error closing websocket: {e}")
|
||||
finally:
|
||||
self._started = False
|
||||
self._websocket = None
|
||||
|
||||
async def _receive_messages(self):
|
||||
async for message in self._websocket:
|
||||
@@ -235,7 +243,7 @@ class NeuphonicTTSService(InterruptibleTTSService):
|
||||
logger.debug(f"Generating TTS: [{text}]")
|
||||
|
||||
try:
|
||||
if not self._websocket:
|
||||
if not self._websocket or self._websocket.closed:
|
||||
await self._connect()
|
||||
|
||||
try:
|
||||
|
||||
@@ -157,7 +157,7 @@ class PlayHTTTSService(InterruptibleTTSService):
|
||||
async def _connect(self):
|
||||
await self._connect_websocket()
|
||||
|
||||
if not self._receive_task:
|
||||
if self._websocket and not self._receive_task:
|
||||
self._receive_task = self.create_task(self._receive_task_handler(self._report_error))
|
||||
|
||||
async def _disconnect(self):
|
||||
@@ -169,7 +169,7 @@ class PlayHTTTSService(InterruptibleTTSService):
|
||||
|
||||
async def _connect_websocket(self):
|
||||
try:
|
||||
if self._websocket:
|
||||
if self._websocket and self._websocket.open:
|
||||
return
|
||||
|
||||
logger.debug("Connecting to PlayHT")
|
||||
@@ -197,11 +197,11 @@ class PlayHTTTSService(InterruptibleTTSService):
|
||||
if self._websocket:
|
||||
logger.debug("Disconnecting from PlayHT")
|
||||
await self._websocket.close()
|
||||
self._websocket = None
|
||||
|
||||
self._request_id = None
|
||||
except Exception as e:
|
||||
logger.error(f"{self} error closing websocket: {e}")
|
||||
finally:
|
||||
self._request_id = None
|
||||
self._websocket = None
|
||||
|
||||
async def _get_websocket_url(self):
|
||||
async with aiohttp.ClientSession() as session:
|
||||
|
||||
@@ -168,7 +168,7 @@ class RimeTTSService(AudioContextWordTTSService):
|
||||
"""Establish websocket connection and start receive task."""
|
||||
await self._connect_websocket()
|
||||
|
||||
if not self._receive_task:
|
||||
if self._websocket and not self._receive_task:
|
||||
self._receive_task = self.create_task(self._receive_task_handler(self._report_error))
|
||||
|
||||
async def _disconnect(self):
|
||||
@@ -182,7 +182,7 @@ class RimeTTSService(AudioContextWordTTSService):
|
||||
async def _connect_websocket(self):
|
||||
"""Connect to Rime websocket API with configured settings."""
|
||||
try:
|
||||
if self._websocket:
|
||||
if self._websocket and self._websocket.open:
|
||||
return
|
||||
|
||||
params = "&".join(f"{k}={v}" for k, v in self._settings.items())
|
||||
@@ -201,10 +201,11 @@ class RimeTTSService(AudioContextWordTTSService):
|
||||
if self._websocket:
|
||||
await self._websocket.send(json.dumps(self._build_eos_msg()))
|
||||
await self._websocket.close()
|
||||
self._websocket = None
|
||||
self._context_id = None
|
||||
except Exception as e:
|
||||
logger.error(f"{self} error closing websocket: {e}")
|
||||
finally:
|
||||
self._context_id = None
|
||||
self._websocket = None
|
||||
|
||||
def _get_websocket(self):
|
||||
"""Get active websocket connection or raise exception."""
|
||||
@@ -316,7 +317,7 @@ class RimeTTSService(AudioContextWordTTSService):
|
||||
"""
|
||||
logger.debug(f"{self}: Generating TTS [{text}]")
|
||||
try:
|
||||
if not self._websocket:
|
||||
if not self._websocket or self._websocket.closed:
|
||||
await self._connect()
|
||||
|
||||
try:
|
||||
|
||||
@@ -31,7 +31,7 @@ class WebsocketService(ABC):
|
||||
bool: True if connection is verified working, False otherwise
|
||||
"""
|
||||
try:
|
||||
if not self._websocket:
|
||||
if not self._websocket or self._websocket.closed:
|
||||
return False
|
||||
await self._websocket.ping()
|
||||
return True
|
||||
|
||||
@@ -10,6 +10,7 @@ from typing import Optional
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.turn.base_turn_analyzer import BaseTurnAnalyzer, EndOfTurnState
|
||||
from pipecat.audio.vad.vad_analyzer import VADAnalyzer, VADState
|
||||
from pipecat.frames.frames import (
|
||||
BotInterruptionFrame,
|
||||
@@ -64,12 +65,19 @@ class BaseInputTransport(FrameProcessor):
|
||||
def vad_analyzer(self) -> Optional[VADAnalyzer]:
|
||||
return self._params.vad_analyzer
|
||||
|
||||
@property
|
||||
def turn_analyzer(self) -> Optional[BaseTurnAnalyzer]:
|
||||
return self._params.turn_analyzer
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
self._sample_rate = self._params.audio_in_sample_rate or frame.audio_in_sample_rate
|
||||
|
||||
# Configure VAD analyzer.
|
||||
if self._params.vad_enabled and self._params.vad_analyzer:
|
||||
self._params.vad_analyzer.set_sample_rate(self._sample_rate)
|
||||
# Configure End of turn analyzer.
|
||||
if self._params.turn_analyzer:
|
||||
self._params.turn_analyzer.set_sample_rate(self._sample_rate)
|
||||
# Start audio filter.
|
||||
if self._params.audio_in_filter:
|
||||
await self._params.audio_in_filter.start(self._sample_rate)
|
||||
@@ -187,10 +195,18 @@ class BaseInputTransport(FrameProcessor):
|
||||
and new_vad_state != VADState.STOPPING
|
||||
):
|
||||
frame = None
|
||||
if new_vad_state == VADState.SPEAKING:
|
||||
frame = UserStartedSpeakingFrame()
|
||||
elif new_vad_state == VADState.QUIET:
|
||||
frame = UserStoppedSpeakingFrame()
|
||||
# If the turn analyser is enabled, this will prevent:
|
||||
# - Creating the UserStoppedSpeakingFrame
|
||||
# - Creating the UserStartedSpeakingFrame multiple times
|
||||
can_create_user_frames = (
|
||||
self._params.turn_analyzer is None
|
||||
or not self._params.turn_analyzer.speech_triggered
|
||||
)
|
||||
if can_create_user_frames:
|
||||
if new_vad_state == VADState.SPEAKING:
|
||||
frame = UserStartedSpeakingFrame()
|
||||
elif new_vad_state == VADState.QUIET:
|
||||
frame = UserStoppedSpeakingFrame()
|
||||
|
||||
if frame:
|
||||
await self._handle_user_interruption(frame)
|
||||
@@ -198,6 +214,29 @@ class BaseInputTransport(FrameProcessor):
|
||||
vad_state = new_vad_state
|
||||
return vad_state
|
||||
|
||||
async def _handle_end_of_turn(self):
|
||||
if self.turn_analyzer:
|
||||
state = await self.get_event_loop().run_in_executor(
|
||||
self._executor, self.turn_analyzer.analyze_end_of_turn
|
||||
)
|
||||
await self._handle_end_of_turn_complete(state)
|
||||
|
||||
async def _handle_end_of_turn_complete(self, state: EndOfTurnState):
|
||||
if state == EndOfTurnState.COMPLETE:
|
||||
await self._handle_user_interruption(UserStoppedSpeakingFrame())
|
||||
|
||||
async def _run_turn_analyzer(
|
||||
self, frame: InputAudioRawFrame, vad_state: VADState, previous_vad_state: VADState
|
||||
):
|
||||
is_speech = vad_state == VADState.SPEAKING or vad_state == VADState.STARTING
|
||||
# If silence exceeds threshold, we are going to receive EndOfTurnState.COMPLETE
|
||||
end_of_turn_state = self._params.turn_analyzer.append_audio(frame.audio, is_speech)
|
||||
if end_of_turn_state == EndOfTurnState.COMPLETE:
|
||||
await self._handle_end_of_turn_complete(end_of_turn_state)
|
||||
# Otherwise we are going to trigger to check if the turn is completed based on the VAD
|
||||
elif vad_state == VADState.QUIET and vad_state != previous_vad_state:
|
||||
await self._handle_end_of_turn()
|
||||
|
||||
async def _audio_task_handler(self):
|
||||
vad_state: VADState = VADState.QUIET
|
||||
while True:
|
||||
@@ -211,10 +250,14 @@ class BaseInputTransport(FrameProcessor):
|
||||
|
||||
# Check VAD and push event if necessary. We just care about
|
||||
# changes from QUIET to SPEAKING and vice versa.
|
||||
previous_vad_state = vad_state
|
||||
if self._params.vad_enabled:
|
||||
vad_state = await self._handle_vad(frame, vad_state)
|
||||
audio_passthrough = self._params.vad_audio_passthrough
|
||||
|
||||
if self._params.turn_analyzer:
|
||||
await self._run_turn_analyzer(frame, vad_state, previous_vad_state)
|
||||
|
||||
# Push audio downstream if passthrough.
|
||||
if audio_passthrough:
|
||||
await self.push_frame(frame)
|
||||
|
||||
@@ -386,10 +386,13 @@ class BaseOutputTransport(FrameProcessor):
|
||||
async def _draw_image(self, frame: OutputImageRawFrame):
|
||||
desired_size = (self._params.camera_out_width, self._params.camera_out_height)
|
||||
|
||||
# TODO: we should refactor in the future to support dynamic resolutions
|
||||
# which is kind of what happens in P2P connections.
|
||||
# We need to add support for that inside the DailyTransport
|
||||
if frame.size != desired_size:
|
||||
image = Image.frombytes(frame.format, frame.size, frame.image)
|
||||
resized_image = image.resize(desired_size)
|
||||
logger.warning(f"{frame} does not have the expected size {desired_size}, resizing")
|
||||
# logger.warning(f"{frame} does not have the expected size {desired_size}, resizing")
|
||||
frame = OutputImageRawFrame(
|
||||
resized_image.tobytes(), resized_image.size, resized_image.format
|
||||
)
|
||||
|
||||
@@ -11,6 +11,7 @@ from pydantic import BaseModel, ConfigDict
|
||||
|
||||
from pipecat.audio.filters.base_audio_filter import BaseAudioFilter
|
||||
from pipecat.audio.mixers.base_audio_mixer import BaseAudioMixer
|
||||
from pipecat.audio.turn.base_turn_analyzer import BaseTurnAnalyzer
|
||||
from pipecat.audio.vad.vad_analyzer import VADAnalyzer
|
||||
from pipecat.processors.frame_processor import FrameProcessor
|
||||
from pipecat.utils.base_object import BaseObject
|
||||
@@ -41,6 +42,7 @@ class TransportParams(BaseModel):
|
||||
vad_enabled: bool = False
|
||||
vad_audio_passthrough: bool = False
|
||||
vad_analyzer: Optional[VADAnalyzer] = None
|
||||
turn_analyzer: Optional[BaseTurnAnalyzer] = None
|
||||
|
||||
|
||||
class BaseTransport(BaseObject):
|
||||
|
||||
Reference in New Issue
Block a user