Compare commits

...

17 Commits

Author SHA1 Message Date
James Hush
d77ed9948d Save order 2025-04-30 16:04:38 +08:00
Mark Backman
10cdc47e05 Merge pull request #1689 from pipecat-ai/mb/handle-http-smart-turn-errors
Handle case where Fal Smart Turn returns a 500 error
2025-04-29 14:21:45 -04:00
Mark Backman
2b4d41a548 Merge pull request #1693 from flixoflax/bump-modal-example-dependencies
Bump modal deployment example dependencies
2025-04-29 14:21:31 -04:00
Filipi da Silva Fuchter
962f8062a5 Merge pull request #1581 from pipecat-ai/voice_agent_ice_servers
Configuring the voice-agent example to wait for all the ice candidates.
2025-04-29 13:30:12 -03:00
Filipi Fuchter
d80d385b2f Adding a section explaining about ice servers. 2025-04-29 12:19:59 -03:00
Filipi Fuchter
b347ca472f Checking the state again to avoid any eventual race condition 2025-04-29 12:04:20 -03:00
Filipi Fuchter
c3c4952abf Reducing the timeout to 2 seconds for gathering the ice candidates. 2025-04-29 11:34:58 -03:00
Filipi Fuchter
f369ab4c1a Printing each new ice candidate. 2025-04-29 11:23:33 -03:00
flixoflax
62b41c6789 removed aiohttp from deps, and version constraint from pipecat-ai 2025-04-29 16:14:51 +02:00
Filipi da Silva Fuchter
2872bc7902 Merge pull request #1587 from pipecat-ai/improving_ice_servers
Updated `SmallWebRTCConnection` to support `ice_servers` with credentials.
2025-04-29 11:07:32 -03:00
Filipi Fuchter
9658b75a10 Configuring the voice-agent example to use ice-servers and wait for all the ice candidates. 2025-04-29 10:52:07 -03:00
flixoflax
63de9039e6 Bumb modal deployment example deps 2025-04-29 15:50:56 +02:00
Filipi Fuchter
9352396d7e Mentioning the new feature in the changelog. 2025-04-29 10:40:09 -03:00
Filipi Fuchter
d1ab1d38b7 Fixing the examples to use the new IceServer structure. 2025-04-29 10:33:19 -03:00
Filipi Fuchter
080f70d91c Allowing to define the username and credential for the ice servers. 2025-04-29 10:32:42 -03:00
Mark Backman
ebed1fc6ea Restructure _send_raw_request to raise errors early, then process the successful response 2025-04-29 09:05:14 -04:00
Mark Backman
144ae9b611 Handle case where Fal Smart Turn returns a 500 error 2025-04-29 08:53:02 -04:00
11 changed files with 212 additions and 42 deletions

View File

@@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Updated `SmallWebRTCConnection` to support `ice_servers` with credentials.
- Added `VADUserStartedSpeakingFrame` and `VADUserStoppedSpeakingFrame`,
indicating when the VAD detected the user to start and stop speaking. These
events are helpful when using smart turn detection, as the user's stop time
@@ -58,9 +60,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Deprecated
- Function calls with parameters `(function_name, tool_call_id, args, llm,
context, result_callback)` are deprectated, use a single `FunctionCallParams`
parameter instead.
- Function calls with parameters
`(function_name, tool_call_id, args, llm, context, result_callback)` are
deprectated, use a single `FunctionCallParams` parameter instead.
- `TransportParams.camera_*` parameters are now deprecated, use
`TransportParams.video_*` instead.
@@ -73,6 +75,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- Fixed an issue with HTTP Smart Turn handling, where the service returns a 500
error. Previously, this would cause an unhandled exception. Now, a 500 error
is treated as an incomplete response.
- Fixed a TTS services issue that could cause assistant output not to be
aggregated to the context when also using `TTSSpeakFrame`s.

View File

@@ -1,5 +1,4 @@
python-dotenv==1.0.1
modal==0.71.3
pipecat-ai[daily,silero,cartesia,openai]==0.0.52
pipecat-ai[daily,silero,cartesia,openai]
fastapi==0.115.6
aiohttp==3.11.11

View File

@@ -20,7 +20,7 @@ from fastapi.responses import RedirectResponse
from loguru import logger
from pipecat_ai_small_webrtc_prebuilt.frontend import SmallWebRTCPrebuiltUI
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
from pipecat.transports.network.webrtc_connection import IceServer, SmallWebRTCConnection
# Load environment variables
load_dotenv(override=True)
@@ -30,7 +30,11 @@ app = FastAPI()
# Store connections by pc_id
pcs_map: Dict[str, SmallWebRTCConnection] = {}
ice_servers = ["stun:stun.l.google.com:19302"]
ice_servers = [
IceServer(
urls="stun:stun.l.google.com:19302",
)
]
# Mount the frontend at /
app.mount("/client", SmallWebRTCPrebuiltUI)

View File

@@ -18,7 +18,7 @@ from fastapi.responses import RedirectResponse
from loguru import logger
from pipecat_ai_small_webrtc_prebuilt.frontend import SmallWebRTCPrebuiltUI
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
from pipecat.transports.network.webrtc_connection import IceServer, SmallWebRTCConnection
# Load environment variables
load_dotenv(override=True)
@@ -28,7 +28,11 @@ app = FastAPI()
# Store connections by pc_id
pcs_map: Dict[str, SmallWebRTCConnection] = {}
ice_servers = ["stun:stun.l.google.com:19302"]
ice_servers = [
IceServer(
urls="stun:stun.l.google.com:19302",
)
]
# Mount the frontend at /
app.mount("/prebuilt", SmallWebRTCPrebuiltUI)

View File

@@ -18,7 +18,7 @@ from fastapi.responses import RedirectResponse
from loguru import logger
from pipecat_ai_small_webrtc_prebuilt.frontend import SmallWebRTCPrebuiltUI
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
from pipecat.transports.network.webrtc_connection import IceServer, SmallWebRTCConnection
# Load environment variables
load_dotenv(override=True)
@@ -28,7 +28,11 @@ app = FastAPI()
# Store connections by pc_id
pcs_map: Dict[str, SmallWebRTCConnection] = {}
ice_servers = ["stun:stun.l.google.com:19302"]
ice_servers = [
IceServer(
urls="stun:stun.l.google.com:19302",
)
]
# Mount the frontend at /
app.mount("/prebuilt", SmallWebRTCPrebuiltUI)

View File

@@ -46,6 +46,46 @@ http://localhost:7860
---
## WebRTC ICE Servers Configuration
When implementing WebRTC in your project, **STUN** (Session Traversal Utilities for NAT) and **TURN** (Traversal Using Relays around NAT)
servers are usually needed in cases where users are behind routers or firewalls.
In local networks (e.g., testing within the same home or office network), you usually dont need to configure STUN or TURN servers.
In such cases, WebRTC can often directly establish peer-to-peer connections without needing to traverse NAT or firewalls.
### What are STUN and TURN Servers?
- **STUN Server**: Helps clients discover their public IP address and port when they're behind a NAT (Network Address Translation) device (like a router).
This allows WebRTC to attempt direct peer-to-peer communication by providing the public-facing IP and port.
- **TURN Server**: Used as a fallback when direct peer-to-peer communication isn't possible due to strict NATs or firewalls blocking connections.
The TURN server relays media traffic between peers.
### Why are ICE Servers Important?
**ICE (Interactive Connectivity Establishment)** is a framework used by WebRTC to handle network traversal and NAT issues.
The `iceServers` configuration provides a list of **STUN** and **TURN** servers that WebRTC uses to find the best way to connect two peers.
### Example Configuration for ICE Servers
Heres how you can configure a basic `iceServers` object in WebRTC for testing purposes, using Google's public STUN server:
```javascript
const config = {
iceServers: [
{
urls: ["stun:stun.l.google.com:19302"], // Google's public STUN server
}
],
};
```
> For testing purposes, you can either use public **STUN** servers (like Google's) or set up your own **TURN** server.
If you're running your own TURN server, make sure to include your server URL, username, and credential in the configuration.
---
### 💡 Notes
- Ensure all dependencies are installed before running the server.
- Check the `.env` file for missing configurations.

View File

@@ -24,27 +24,47 @@
let connected = false
let peerConnection = null
/*const waitForIceGatheringComplete = async (pc) => {
const waitForIceGatheringComplete = async (pc, timeoutMs = 2000) => {
if (pc.iceGatheringState === 'complete') return;
console.log("Waiting for ICE gathering to complete. Current state:", pc.iceGatheringState);
return new Promise((resolve) => {
let timeoutId;
const checkState = () => {
console.log("icegatheringstatechange:", pc.iceGatheringState);
if (pc.iceGatheringState === 'complete') {
pc.removeEventListener('icegatheringstatechange', checkState);
cleanup();
resolve();
}
};
const onTimeout = () => {
console.warn(`ICE gathering timed out after ${timeoutMs} ms.`);
cleanup();
resolve();
};
const cleanup = () => {
pc.removeEventListener('icegatheringstatechange', checkState);
clearTimeout(timeoutId);
};
pc.addEventListener('icegatheringstatechange', checkState);
timeoutId = setTimeout(onTimeout, timeoutMs);
// Checking the state again to avoid any eventual race condition
checkState();
});
}*/
};
const createSmallWebRTCConnection = async (audioTrack) => {
const pc = new RTCPeerConnection()
const config = {
iceServers: [],
};
const pc = new RTCPeerConnection(config)
addPeerConnectionEventListeners(pc)
pc.ontrack = e => audioEl.srcObject = e.streams[0]
// SmallWebRTCTransport expects to receive both transceivers
pc.addTransceiver(audioTrack, { direction: 'sendrecv' })
pc.addTransceiver('video', { direction: 'sendrecv' })
await pc.setLocalDescription(await pc.createOffer())
//await waitForIceGatheringComplete(pc)
await waitForIceGatheringComplete(pc)
const offer = pc.localDescription
const response = await fetch('/api/offer', {
body: JSON.stringify({ sdp: offer.sdp, type: offer.type}),
@@ -57,16 +77,37 @@
}
const connect = async () => {
_onConnecting()
const audioStream = await navigator.mediaDevices.getUserMedia({audio: true})
peerConnection= await createSmallWebRTCConnection(audioStream.getAudioTracks()[0])
peerConnection.onconnectionstatechange = () => {
let connectionState = peerConnection?.connectionState
}
const addPeerConnectionEventListeners = (pc) => {
pc.oniceconnectionstatechange = () => {
console.log("oniceconnectionstatechange", pc?.iceConnectionState)
}
pc.onconnectionstatechange = () => {
console.log("onconnectionstatechange", pc?.connectionState)
let connectionState = pc?.connectionState
if (connectionState === 'connected') {
_onConnected()
} else if (connectionState === 'disconnected') {
_onDisconnected()
}
}
pc.onicecandidate = (event) => {
if (event.candidate) {
console.log("New ICE candidate:", event.candidate);
} else {
console.log("All ICE candidates have been sent.");
}
};
}
const _onConnecting = () => {
statusEl.textContent = "Connecting"
buttonEl.textContent = "Disconnect"
connected = true
}
const _onConnected = () => {

View File

@@ -17,7 +17,7 @@ from fastapi import BackgroundTasks, FastAPI
from fastapi.responses import FileResponse
from loguru import logger
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
from pipecat.transports.network.webrtc_connection import IceServer, SmallWebRTCConnection
# Load environment variables
load_dotenv(override=True)
@@ -28,6 +28,13 @@ app = FastAPI()
pcs_map: Dict[str, SmallWebRTCConnection] = {}
ice_servers = [
IceServer(
urls="stun:stun.l.google.com:19302",
)
]
@app.post("/api/offer")
async def offer(request: dict, background_tasks: BackgroundTasks):
pc_id = request.get("pc_id")
@@ -37,7 +44,7 @@ async def offer(request: dict, background_tasks: BackgroundTasks):
logger.info(f"Reusing existing connection for pc_id: {pc_id}")
await pipecat_connection.renegotiate(sdp=request["sdp"], type=request["type"])
else:
pipecat_connection = SmallWebRTCConnection()
pipecat_connection = SmallWebRTCConnection(ice_servers)
await pipecat_connection.initialize(sdp=request["sdp"], type=request["type"])
@pipecat_connection.event_handler("closed")

View File

@@ -32,13 +32,17 @@ from pipecat.frames.frames import (
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
Frame,
InterimTranscriptionFrame,
OutputImageRawFrame,
SpriteFrame,
STTMuteFrame,
TranscriptionFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.filters.stt_mute_filter import STTMuteConfig, STTMuteFilter, STTMuteStrategy
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
from pipecat.services.elevenlabs.tts import ElevenLabsTTSService
@@ -103,6 +107,30 @@ class TalkingAnimation(FrameProcessor):
await self.push_frame(frame, direction)
class TranscriptionMuteProcessor(FrameProcessor):
def __init__(self):
super().__init__()
self._is_muted = False
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, STTMuteFrame):
self._is_muted = frame.mute
if isinstance(
frame,
(TranscriptionFrame, InterimTranscriptionFrame),
):
# Only pass VAD-related frames when not muted
if not self._is_muted:
await self.push_frame(frame, direction)
else:
logger.trace(
f"{frame.__class__.__name__} suppressed - Transcription currently muted"
)
async def main():
"""Main bot execution function.
@@ -183,10 +211,23 @@ async def main():
#
rtvi = RTVIProcessor(config=RTVIConfig(config=[]))
# Configure the mute processor with both strategies
stt_mute_processor = STTMuteFilter(
config=STTMuteConfig(
strategies={
STTMuteStrategy.ALWAYS,
}
),
)
transcription_mute_processor = TranscriptionMuteProcessor()
pipeline = Pipeline(
[
transport.input(),
rtvi,
stt_mute_processor, # Add the mute processor before STT
transcription_mute_processor,
context_aggregator.user(),
llm,
tts,

View File

@@ -40,7 +40,7 @@ class HttpSmartTurnAnalyzer(BaseSmartTurn):
async def _send_raw_request(self, data_bytes: bytes) -> Dict[str, Any]:
headers = {"Content-Type": "application/octet-stream"}
headers.update(self._headers)
logger.trace(f"Sending {len(data_bytes)} bytes as raw body to {self._url}...")
try:
timeout = aiohttp.ClientTimeout(total=self._params.stop_secs)
@@ -50,23 +50,30 @@ class HttpSmartTurnAnalyzer(BaseSmartTurn):
logger.trace("\n--- Response ---")
logger.trace(f"Status Code: {response.status}")
if response.status == 200:
try:
json_data = await response.json()
logger.trace("Response JSON:")
logger.trace(json_data)
return json_data
except aiohttp.ContentTypeError:
# Non-JSON response
text = await response.text()
logger.trace("Response Content (non-JSON):")
logger.trace(text)
raise Exception(f"Non-JSON response: {text}")
else:
# Check if successful
if response.status != 200:
error_text = await response.text()
logger.trace("Response Content (Error):")
logger.trace(error_text)
response.raise_for_status()
if response.status == 500:
logger.warning(f"Smart turn service returned 500 error: {error_text}")
raise Exception(f"Server returned HTTP 500: {error_text}")
else:
response.raise_for_status()
# Process successful response
try:
json_data = await response.json()
logger.trace("Response JSON:")
logger.trace(json_data)
return json_data
except aiohttp.ContentTypeError:
# Non-JSON response
text = await response.text()
logger.trace("Response Content (non-JSON):")
logger.trace(text)
raise Exception(f"Non-JSON response: {text}")
except asyncio.TimeoutError:
logger.error(f"Request timed out after {self._params.stop_secs} seconds")
@@ -76,5 +83,14 @@ class HttpSmartTurnAnalyzer(BaseSmartTurn):
raise Exception("Failed to send raw request to Daily Smart Turn.")
async def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, Any]:
serialized_array = self._serialize_array(audio_array)
return await self._send_raw_request(serialized_array)
try:
serialized_array = self._serialize_array(audio_array)
return await self._send_raw_request(serialized_array)
except Exception as e:
logger.error(f"Smart turn prediction failed: {str(e)}")
# Return an incomplete prediction when a failure occurs
return {
"prediction": 0,
"probability": 0.0,
"metrics": {"inference_time": 0.0, "total_time": 0.0},
}

View File

@@ -7,7 +7,7 @@
import asyncio
import json
import time
from typing import Any, Literal, Optional, Union
from typing import Any, List, Literal, Optional, Union
from av.frame import Frame
from loguru import logger
@@ -87,13 +87,21 @@ class SmallWebRTCTrack:
return getattr(self._track, name)
# Alias so we don't need to expose RTCIceServer
IceServer = RTCIceServer
class SmallWebRTCConnection(BaseObject):
def __init__(self, ice_servers=None):
def __init__(self, ice_servers: Optional[Union[List[str], List[IceServer]]] = None):
super().__init__()
if ice_servers:
self.ice_servers = [RTCIceServer(urls=server) for server in ice_servers]
if not ice_servers:
self.ice_servers: List[IceServer] = []
elif all(isinstance(s, IceServer) for s in ice_servers):
self.ice_servers = ice_servers
elif all(isinstance(s, str) for s in ice_servers):
self.ice_servers = [IceServer(urls=s) for s in ice_servers]
else:
self.ice_servers = []
raise TypeError("ice_servers must be either List[str] or List[RTCIceServer]")
self._connect_invoked = False
self._track_map = {}
self._track_getters = {