Compare commits
17 Commits
hush/smart
...
hush/muteT
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d77ed9948d | ||
|
|
10cdc47e05 | ||
|
|
2b4d41a548 | ||
|
|
962f8062a5 | ||
|
|
d80d385b2f | ||
|
|
b347ca472f | ||
|
|
c3c4952abf | ||
|
|
f369ab4c1a | ||
|
|
62b41c6789 | ||
|
|
2872bc7902 | ||
|
|
9658b75a10 | ||
|
|
63de9039e6 | ||
|
|
9352396d7e | ||
|
|
d1ab1d38b7 | ||
|
|
080f70d91c | ||
|
|
ebed1fc6ea | ||
|
|
144ae9b611 |
12
CHANGELOG.md
12
CHANGELOG.md
@@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
### Added
|
||||
|
||||
- Updated `SmallWebRTCConnection` to support `ice_servers` with credentials.
|
||||
|
||||
- Added `VADUserStartedSpeakingFrame` and `VADUserStoppedSpeakingFrame`,
|
||||
indicating when the VAD detected the user to start and stop speaking. These
|
||||
events are helpful when using smart turn detection, as the user's stop time
|
||||
@@ -58,9 +60,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
### Deprecated
|
||||
|
||||
- Function calls with parameters `(function_name, tool_call_id, args, llm,
|
||||
context, result_callback)` are deprectated, use a single `FunctionCallParams`
|
||||
parameter instead.
|
||||
- Function calls with parameters
|
||||
`(function_name, tool_call_id, args, llm, context, result_callback)` are
|
||||
deprectated, use a single `FunctionCallParams` parameter instead.
|
||||
|
||||
- `TransportParams.camera_*` parameters are now deprecated, use
|
||||
`TransportParams.video_*` instead.
|
||||
@@ -73,6 +75,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fixed an issue with HTTP Smart Turn handling, where the service returns a 500
|
||||
error. Previously, this would cause an unhandled exception. Now, a 500 error
|
||||
is treated as an incomplete response.
|
||||
|
||||
- Fixed a TTS services issue that could cause assistant output not to be
|
||||
aggregated to the context when also using `TTSSpeakFrame`s.
|
||||
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
python-dotenv==1.0.1
|
||||
modal==0.71.3
|
||||
pipecat-ai[daily,silero,cartesia,openai]==0.0.52
|
||||
pipecat-ai[daily,silero,cartesia,openai]
|
||||
fastapi==0.115.6
|
||||
aiohttp==3.11.11
|
||||
|
||||
@@ -20,7 +20,7 @@ from fastapi.responses import RedirectResponse
|
||||
from loguru import logger
|
||||
from pipecat_ai_small_webrtc_prebuilt.frontend import SmallWebRTCPrebuiltUI
|
||||
|
||||
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
|
||||
from pipecat.transports.network.webrtc_connection import IceServer, SmallWebRTCConnection
|
||||
|
||||
# Load environment variables
|
||||
load_dotenv(override=True)
|
||||
@@ -30,7 +30,11 @@ app = FastAPI()
|
||||
# Store connections by pc_id
|
||||
pcs_map: Dict[str, SmallWebRTCConnection] = {}
|
||||
|
||||
ice_servers = ["stun:stun.l.google.com:19302"]
|
||||
ice_servers = [
|
||||
IceServer(
|
||||
urls="stun:stun.l.google.com:19302",
|
||||
)
|
||||
]
|
||||
|
||||
# Mount the frontend at /
|
||||
app.mount("/client", SmallWebRTCPrebuiltUI)
|
||||
|
||||
@@ -18,7 +18,7 @@ from fastapi.responses import RedirectResponse
|
||||
from loguru import logger
|
||||
from pipecat_ai_small_webrtc_prebuilt.frontend import SmallWebRTCPrebuiltUI
|
||||
|
||||
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
|
||||
from pipecat.transports.network.webrtc_connection import IceServer, SmallWebRTCConnection
|
||||
|
||||
# Load environment variables
|
||||
load_dotenv(override=True)
|
||||
@@ -28,7 +28,11 @@ app = FastAPI()
|
||||
# Store connections by pc_id
|
||||
pcs_map: Dict[str, SmallWebRTCConnection] = {}
|
||||
|
||||
ice_servers = ["stun:stun.l.google.com:19302"]
|
||||
ice_servers = [
|
||||
IceServer(
|
||||
urls="stun:stun.l.google.com:19302",
|
||||
)
|
||||
]
|
||||
|
||||
# Mount the frontend at /
|
||||
app.mount("/prebuilt", SmallWebRTCPrebuiltUI)
|
||||
|
||||
@@ -18,7 +18,7 @@ from fastapi.responses import RedirectResponse
|
||||
from loguru import logger
|
||||
from pipecat_ai_small_webrtc_prebuilt.frontend import SmallWebRTCPrebuiltUI
|
||||
|
||||
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
|
||||
from pipecat.transports.network.webrtc_connection import IceServer, SmallWebRTCConnection
|
||||
|
||||
# Load environment variables
|
||||
load_dotenv(override=True)
|
||||
@@ -28,7 +28,11 @@ app = FastAPI()
|
||||
# Store connections by pc_id
|
||||
pcs_map: Dict[str, SmallWebRTCConnection] = {}
|
||||
|
||||
ice_servers = ["stun:stun.l.google.com:19302"]
|
||||
ice_servers = [
|
||||
IceServer(
|
||||
urls="stun:stun.l.google.com:19302",
|
||||
)
|
||||
]
|
||||
|
||||
# Mount the frontend at /
|
||||
app.mount("/prebuilt", SmallWebRTCPrebuiltUI)
|
||||
|
||||
@@ -46,6 +46,46 @@ http://localhost:7860
|
||||
|
||||
---
|
||||
|
||||
## WebRTC ICE Servers Configuration
|
||||
|
||||
When implementing WebRTC in your project, **STUN** (Session Traversal Utilities for NAT) and **TURN** (Traversal Using Relays around NAT)
|
||||
servers are usually needed in cases where users are behind routers or firewalls.
|
||||
|
||||
In local networks (e.g., testing within the same home or office network), you usually don’t need to configure STUN or TURN servers.
|
||||
In such cases, WebRTC can often directly establish peer-to-peer connections without needing to traverse NAT or firewalls.
|
||||
|
||||
### What are STUN and TURN Servers?
|
||||
|
||||
- **STUN Server**: Helps clients discover their public IP address and port when they're behind a NAT (Network Address Translation) device (like a router).
|
||||
This allows WebRTC to attempt direct peer-to-peer communication by providing the public-facing IP and port.
|
||||
|
||||
- **TURN Server**: Used as a fallback when direct peer-to-peer communication isn't possible due to strict NATs or firewalls blocking connections.
|
||||
The TURN server relays media traffic between peers.
|
||||
|
||||
### Why are ICE Servers Important?
|
||||
|
||||
**ICE (Interactive Connectivity Establishment)** is a framework used by WebRTC to handle network traversal and NAT issues.
|
||||
The `iceServers` configuration provides a list of **STUN** and **TURN** servers that WebRTC uses to find the best way to connect two peers.
|
||||
|
||||
### Example Configuration for ICE Servers
|
||||
|
||||
Here’s how you can configure a basic `iceServers` object in WebRTC for testing purposes, using Google's public STUN server:
|
||||
|
||||
```javascript
|
||||
const config = {
|
||||
iceServers: [
|
||||
{
|
||||
urls: ["stun:stun.l.google.com:19302"], // Google's public STUN server
|
||||
}
|
||||
],
|
||||
};
|
||||
```
|
||||
|
||||
> For testing purposes, you can either use public **STUN** servers (like Google's) or set up your own **TURN** server.
|
||||
If you're running your own TURN server, make sure to include your server URL, username, and credential in the configuration.
|
||||
|
||||
---
|
||||
|
||||
### 💡 Notes
|
||||
- Ensure all dependencies are installed before running the server.
|
||||
- Check the `.env` file for missing configurations.
|
||||
|
||||
@@ -24,27 +24,47 @@
|
||||
let connected = false
|
||||
let peerConnection = null
|
||||
|
||||
/*const waitForIceGatheringComplete = async (pc) => {
|
||||
const waitForIceGatheringComplete = async (pc, timeoutMs = 2000) => {
|
||||
if (pc.iceGatheringState === 'complete') return;
|
||||
console.log("Waiting for ICE gathering to complete. Current state:", pc.iceGatheringState);
|
||||
return new Promise((resolve) => {
|
||||
let timeoutId;
|
||||
const checkState = () => {
|
||||
console.log("icegatheringstatechange:", pc.iceGatheringState);
|
||||
if (pc.iceGatheringState === 'complete') {
|
||||
pc.removeEventListener('icegatheringstatechange', checkState);
|
||||
cleanup();
|
||||
resolve();
|
||||
}
|
||||
};
|
||||
const onTimeout = () => {
|
||||
console.warn(`ICE gathering timed out after ${timeoutMs} ms.`);
|
||||
cleanup();
|
||||
resolve();
|
||||
};
|
||||
const cleanup = () => {
|
||||
pc.removeEventListener('icegatheringstatechange', checkState);
|
||||
clearTimeout(timeoutId);
|
||||
};
|
||||
pc.addEventListener('icegatheringstatechange', checkState);
|
||||
timeoutId = setTimeout(onTimeout, timeoutMs);
|
||||
// Checking the state again to avoid any eventual race condition
|
||||
checkState();
|
||||
});
|
||||
}*/
|
||||
};
|
||||
|
||||
|
||||
const createSmallWebRTCConnection = async (audioTrack) => {
|
||||
const pc = new RTCPeerConnection()
|
||||
const config = {
|
||||
iceServers: [],
|
||||
};
|
||||
const pc = new RTCPeerConnection(config)
|
||||
addPeerConnectionEventListeners(pc)
|
||||
pc.ontrack = e => audioEl.srcObject = e.streams[0]
|
||||
// SmallWebRTCTransport expects to receive both transceivers
|
||||
pc.addTransceiver(audioTrack, { direction: 'sendrecv' })
|
||||
pc.addTransceiver('video', { direction: 'sendrecv' })
|
||||
await pc.setLocalDescription(await pc.createOffer())
|
||||
//await waitForIceGatheringComplete(pc)
|
||||
await waitForIceGatheringComplete(pc)
|
||||
const offer = pc.localDescription
|
||||
const response = await fetch('/api/offer', {
|
||||
body: JSON.stringify({ sdp: offer.sdp, type: offer.type}),
|
||||
@@ -57,16 +77,37 @@
|
||||
}
|
||||
|
||||
const connect = async () => {
|
||||
_onConnecting()
|
||||
const audioStream = await navigator.mediaDevices.getUserMedia({audio: true})
|
||||
peerConnection= await createSmallWebRTCConnection(audioStream.getAudioTracks()[0])
|
||||
peerConnection.onconnectionstatechange = () => {
|
||||
let connectionState = peerConnection?.connectionState
|
||||
}
|
||||
|
||||
const addPeerConnectionEventListeners = (pc) => {
|
||||
pc.oniceconnectionstatechange = () => {
|
||||
console.log("oniceconnectionstatechange", pc?.iceConnectionState)
|
||||
}
|
||||
pc.onconnectionstatechange = () => {
|
||||
console.log("onconnectionstatechange", pc?.connectionState)
|
||||
let connectionState = pc?.connectionState
|
||||
if (connectionState === 'connected') {
|
||||
_onConnected()
|
||||
} else if (connectionState === 'disconnected') {
|
||||
_onDisconnected()
|
||||
}
|
||||
}
|
||||
pc.onicecandidate = (event) => {
|
||||
if (event.candidate) {
|
||||
console.log("New ICE candidate:", event.candidate);
|
||||
} else {
|
||||
console.log("All ICE candidates have been sent.");
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
const _onConnecting = () => {
|
||||
statusEl.textContent = "Connecting"
|
||||
buttonEl.textContent = "Disconnect"
|
||||
connected = true
|
||||
}
|
||||
|
||||
const _onConnected = () => {
|
||||
|
||||
@@ -17,7 +17,7 @@ from fastapi import BackgroundTasks, FastAPI
|
||||
from fastapi.responses import FileResponse
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
|
||||
from pipecat.transports.network.webrtc_connection import IceServer, SmallWebRTCConnection
|
||||
|
||||
# Load environment variables
|
||||
load_dotenv(override=True)
|
||||
@@ -28,6 +28,13 @@ app = FastAPI()
|
||||
pcs_map: Dict[str, SmallWebRTCConnection] = {}
|
||||
|
||||
|
||||
ice_servers = [
|
||||
IceServer(
|
||||
urls="stun:stun.l.google.com:19302",
|
||||
)
|
||||
]
|
||||
|
||||
|
||||
@app.post("/api/offer")
|
||||
async def offer(request: dict, background_tasks: BackgroundTasks):
|
||||
pc_id = request.get("pc_id")
|
||||
@@ -37,7 +44,7 @@ async def offer(request: dict, background_tasks: BackgroundTasks):
|
||||
logger.info(f"Reusing existing connection for pc_id: {pc_id}")
|
||||
await pipecat_connection.renegotiate(sdp=request["sdp"], type=request["type"])
|
||||
else:
|
||||
pipecat_connection = SmallWebRTCConnection()
|
||||
pipecat_connection = SmallWebRTCConnection(ice_servers)
|
||||
await pipecat_connection.initialize(sdp=request["sdp"], type=request["type"])
|
||||
|
||||
@pipecat_connection.event_handler("closed")
|
||||
|
||||
@@ -32,13 +32,17 @@ from pipecat.frames.frames import (
|
||||
BotStartedSpeakingFrame,
|
||||
BotStoppedSpeakingFrame,
|
||||
Frame,
|
||||
InterimTranscriptionFrame,
|
||||
OutputImageRawFrame,
|
||||
SpriteFrame,
|
||||
STTMuteFrame,
|
||||
TranscriptionFrame,
|
||||
)
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.processors.filters.stt_mute_filter import STTMuteConfig, STTMuteFilter, STTMuteStrategy
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
|
||||
from pipecat.services.elevenlabs.tts import ElevenLabsTTSService
|
||||
@@ -103,6 +107,30 @@ class TalkingAnimation(FrameProcessor):
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
|
||||
class TranscriptionMuteProcessor(FrameProcessor):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self._is_muted = False
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, STTMuteFrame):
|
||||
self._is_muted = frame.mute
|
||||
|
||||
if isinstance(
|
||||
frame,
|
||||
(TranscriptionFrame, InterimTranscriptionFrame),
|
||||
):
|
||||
# Only pass VAD-related frames when not muted
|
||||
if not self._is_muted:
|
||||
await self.push_frame(frame, direction)
|
||||
else:
|
||||
logger.trace(
|
||||
f"{frame.__class__.__name__} suppressed - Transcription currently muted"
|
||||
)
|
||||
|
||||
|
||||
async def main():
|
||||
"""Main bot execution function.
|
||||
|
||||
@@ -183,10 +211,23 @@ async def main():
|
||||
#
|
||||
rtvi = RTVIProcessor(config=RTVIConfig(config=[]))
|
||||
|
||||
# Configure the mute processor with both strategies
|
||||
stt_mute_processor = STTMuteFilter(
|
||||
config=STTMuteConfig(
|
||||
strategies={
|
||||
STTMuteStrategy.ALWAYS,
|
||||
}
|
||||
),
|
||||
)
|
||||
|
||||
transcription_mute_processor = TranscriptionMuteProcessor()
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(),
|
||||
rtvi,
|
||||
stt_mute_processor, # Add the mute processor before STT
|
||||
transcription_mute_processor,
|
||||
context_aggregator.user(),
|
||||
llm,
|
||||
tts,
|
||||
|
||||
@@ -40,7 +40,7 @@ class HttpSmartTurnAnalyzer(BaseSmartTurn):
|
||||
async def _send_raw_request(self, data_bytes: bytes) -> Dict[str, Any]:
|
||||
headers = {"Content-Type": "application/octet-stream"}
|
||||
headers.update(self._headers)
|
||||
logger.trace(f"Sending {len(data_bytes)} bytes as raw body to {self._url}...")
|
||||
|
||||
try:
|
||||
timeout = aiohttp.ClientTimeout(total=self._params.stop_secs)
|
||||
|
||||
@@ -50,23 +50,30 @@ class HttpSmartTurnAnalyzer(BaseSmartTurn):
|
||||
logger.trace("\n--- Response ---")
|
||||
logger.trace(f"Status Code: {response.status}")
|
||||
|
||||
if response.status == 200:
|
||||
try:
|
||||
json_data = await response.json()
|
||||
logger.trace("Response JSON:")
|
||||
logger.trace(json_data)
|
||||
return json_data
|
||||
except aiohttp.ContentTypeError:
|
||||
# Non-JSON response
|
||||
text = await response.text()
|
||||
logger.trace("Response Content (non-JSON):")
|
||||
logger.trace(text)
|
||||
raise Exception(f"Non-JSON response: {text}")
|
||||
else:
|
||||
# Check if successful
|
||||
if response.status != 200:
|
||||
error_text = await response.text()
|
||||
logger.trace("Response Content (Error):")
|
||||
logger.trace(error_text)
|
||||
response.raise_for_status()
|
||||
|
||||
if response.status == 500:
|
||||
logger.warning(f"Smart turn service returned 500 error: {error_text}")
|
||||
raise Exception(f"Server returned HTTP 500: {error_text}")
|
||||
else:
|
||||
response.raise_for_status()
|
||||
|
||||
# Process successful response
|
||||
try:
|
||||
json_data = await response.json()
|
||||
logger.trace("Response JSON:")
|
||||
logger.trace(json_data)
|
||||
return json_data
|
||||
except aiohttp.ContentTypeError:
|
||||
# Non-JSON response
|
||||
text = await response.text()
|
||||
logger.trace("Response Content (non-JSON):")
|
||||
logger.trace(text)
|
||||
raise Exception(f"Non-JSON response: {text}")
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
logger.error(f"Request timed out after {self._params.stop_secs} seconds")
|
||||
@@ -76,5 +83,14 @@ class HttpSmartTurnAnalyzer(BaseSmartTurn):
|
||||
raise Exception("Failed to send raw request to Daily Smart Turn.")
|
||||
|
||||
async def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, Any]:
|
||||
serialized_array = self._serialize_array(audio_array)
|
||||
return await self._send_raw_request(serialized_array)
|
||||
try:
|
||||
serialized_array = self._serialize_array(audio_array)
|
||||
return await self._send_raw_request(serialized_array)
|
||||
except Exception as e:
|
||||
logger.error(f"Smart turn prediction failed: {str(e)}")
|
||||
# Return an incomplete prediction when a failure occurs
|
||||
return {
|
||||
"prediction": 0,
|
||||
"probability": 0.0,
|
||||
"metrics": {"inference_time": 0.0, "total_time": 0.0},
|
||||
}
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
import asyncio
|
||||
import json
|
||||
import time
|
||||
from typing import Any, Literal, Optional, Union
|
||||
from typing import Any, List, Literal, Optional, Union
|
||||
|
||||
from av.frame import Frame
|
||||
from loguru import logger
|
||||
@@ -87,13 +87,21 @@ class SmallWebRTCTrack:
|
||||
return getattr(self._track, name)
|
||||
|
||||
|
||||
# Alias so we don't need to expose RTCIceServer
|
||||
IceServer = RTCIceServer
|
||||
|
||||
|
||||
class SmallWebRTCConnection(BaseObject):
|
||||
def __init__(self, ice_servers=None):
|
||||
def __init__(self, ice_servers: Optional[Union[List[str], List[IceServer]]] = None):
|
||||
super().__init__()
|
||||
if ice_servers:
|
||||
self.ice_servers = [RTCIceServer(urls=server) for server in ice_servers]
|
||||
if not ice_servers:
|
||||
self.ice_servers: List[IceServer] = []
|
||||
elif all(isinstance(s, IceServer) for s in ice_servers):
|
||||
self.ice_servers = ice_servers
|
||||
elif all(isinstance(s, str) for s in ice_servers):
|
||||
self.ice_servers = [IceServer(urls=s) for s in ice_servers]
|
||||
else:
|
||||
self.ice_servers = []
|
||||
raise TypeError("ice_servers must be either List[str] or List[RTCIceServer]")
|
||||
self._connect_invoked = False
|
||||
self._track_map = {}
|
||||
self._track_getters = {
|
||||
|
||||
Reference in New Issue
Block a user