Compare commits

...

19 Commits

Author SHA1 Message Date
James Hush
4721c07712 Just start with the user idle timer 2025-05-02 14:16:34 +08:00
James Hush
1e4f04330a Use StartConversationFrame 2025-04-30 14:27:08 +08:00
James Hush
3b0d4aa87b demo: say a default message after 4 seconds of being idle or after user stops speaking once 2025-04-30 12:24:02 +08:00
Mark Backman
10cdc47e05 Merge pull request #1689 from pipecat-ai/mb/handle-http-smart-turn-errors
Handle case where Fal Smart Turn returns a 500 error
2025-04-29 14:21:45 -04:00
Mark Backman
2b4d41a548 Merge pull request #1693 from flixoflax/bump-modal-example-dependencies
Bump modal deployment example dependencies
2025-04-29 14:21:31 -04:00
Filipi da Silva Fuchter
962f8062a5 Merge pull request #1581 from pipecat-ai/voice_agent_ice_servers
Configuring the voice-agent example to wait for all the ice candidates.
2025-04-29 13:30:12 -03:00
Filipi Fuchter
d80d385b2f Adding a section explaining about ice servers. 2025-04-29 12:19:59 -03:00
Filipi Fuchter
b347ca472f Checking the state again to avoid any eventual race condition 2025-04-29 12:04:20 -03:00
Filipi Fuchter
c3c4952abf Reducing the timeout to 2 seconds for gathering the ice candidates. 2025-04-29 11:34:58 -03:00
Filipi Fuchter
f369ab4c1a Printing each new ice candidate. 2025-04-29 11:23:33 -03:00
flixoflax
62b41c6789 removed aiohttp from deps, and version constraint from pipecat-ai 2025-04-29 16:14:51 +02:00
Filipi da Silva Fuchter
2872bc7902 Merge pull request #1587 from pipecat-ai/improving_ice_servers
Updated `SmallWebRTCConnection` to support `ice_servers` with credentials.
2025-04-29 11:07:32 -03:00
Filipi Fuchter
9658b75a10 Configuring the voice-agent example to use ice-servers and wait for all the ice candidates. 2025-04-29 10:52:07 -03:00
flixoflax
63de9039e6 Bumb modal deployment example deps 2025-04-29 15:50:56 +02:00
Filipi Fuchter
9352396d7e Mentioning the new feature in the changelog. 2025-04-29 10:40:09 -03:00
Filipi Fuchter
d1ab1d38b7 Fixing the examples to use the new IceServer structure. 2025-04-29 10:33:19 -03:00
Filipi Fuchter
080f70d91c Allowing to define the username and credential for the ice servers. 2025-04-29 10:32:42 -03:00
Mark Backman
ebed1fc6ea Restructure _send_raw_request to raise errors early, then process the successful response 2025-04-29 09:05:14 -04:00
Mark Backman
144ae9b611 Handle case where Fal Smart Turn returns a 500 error 2025-04-29 08:53:02 -04:00
11 changed files with 275 additions and 56 deletions

View File

@@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Updated `SmallWebRTCConnection` to support `ice_servers` with credentials.
- Added `VADUserStartedSpeakingFrame` and `VADUserStoppedSpeakingFrame`,
indicating when the VAD detected the user to start and stop speaking. These
events are helpful when using smart turn detection, as the user's stop time
@@ -58,9 +60,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Deprecated
- Function calls with parameters `(function_name, tool_call_id, args, llm,
context, result_callback)` are deprectated, use a single `FunctionCallParams`
parameter instead.
- Function calls with parameters
`(function_name, tool_call_id, args, llm, context, result_callback)` are
deprectated, use a single `FunctionCallParams` parameter instead.
- `TransportParams.camera_*` parameters are now deprecated, use
`TransportParams.video_*` instead.
@@ -73,6 +75,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- Fixed an issue with HTTP Smart Turn handling, where the service returns a 500
error. Previously, this would cause an unhandled exception. Now, a 500 error
is treated as an incomplete response.
- Fixed a TTS services issue that could cause assistant output not to be
aggregated to the context when also using `TTSSpeakFrame`s.

View File

@@ -1,5 +1,4 @@
python-dotenv==1.0.1
modal==0.71.3
pipecat-ai[daily,silero,cartesia,openai]==0.0.52
pipecat-ai[daily,silero,cartesia,openai]
fastapi==0.115.6
aiohttp==3.11.11

View File

@@ -6,16 +6,30 @@
import argparse
import os
from dataclasses import dataclass
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame, LLMMessagesFrame, TTSSpeakFrame
from pipecat.frames.frames import (
BotSpeakingFrame,
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
EndFrame,
Frame,
LLMMessagesFrame,
StartInterruptionFrame,
TTSSpeakFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.observers.base_observer import BaseObserver
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.user_idle_processor import UserIdleProcessor
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
@@ -27,6 +41,84 @@ from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
load_dotenv(override=True)
class DebugObserver(BaseObserver):
"""Observer to log interruptions and bot speaking events to the console.
Logs all frame instances of:
- StartInterruptionFrame
- BotStartedSpeakingFrame
- BotStoppedSpeakingFrame
This allows you to see the frame flow from processor to processor through the pipeline for these frames.
Log format: [EVENT TYPE]: [source processor] → [destination processor] at [timestamp]s
"""
async def on_push_frame(
self,
src: FrameProcessor,
dst: FrameProcessor,
frame: Frame,
direction: FrameDirection,
timestamp: int,
):
# Convert timestamp to seconds for readability
time_sec = timestamp / 1_000_000_000
# Create direction arrow
arrow = "" if direction == FrameDirection.DOWNSTREAM else ""
if isinstance(frame, StartInterruptionFrame):
logger.info(f"⚡ INTERRUPTION START: {src} {arrow} {dst} at {time_sec:.2f}s")
elif isinstance(frame, BotStartedSpeakingFrame):
logger.info(f"🤖 BOT START SPEAKING: {src} {arrow} {dst} at {time_sec:.2f}s")
elif isinstance(frame, BotStoppedSpeakingFrame):
logger.info(f"🤖 BOT STOP SPEAKING: {src} {arrow} {dst} at {time_sec:.2f}s")
@dataclass
class StartConversationFrame(Frame):
"""Frame to initiate a conversation.
This frame is used to signal the start of a conversation in the pipeline.
It can be used to trigger specific actions or responses from the system.
"""
pass
class ConversationStarterProcessor(FrameProcessor):
def __init__(self, message: str = "Hi! I'm a default message!"):
super().__init__()
self.message = message
self._user_stopped_speaking_count = 0
async def process_frame(self, frame: Frame, direction: FrameDirection) -> None:
"""Say a default message when the user starts speaking.
This processor listens for the UserStartedSpeakingFrame and sends a default message
when the user starts speaking for the first time.
Args:
frame: The frame to process
direction: Direction of the frame flow
"""
await super().process_frame(frame, direction)
if isinstance(frame, (StartConversationFrame, UserStoppedSpeakingFrame)):
self._user_stopped_speaking_count += 1
logger.info(
f"++ {frame.name} User stopped speaking, count: {self._user_stopped_speaking_count}"
)
if self._user_stopped_speaking_count == 1:
# First time user started speaking, send the message
await self.push_frame(TTSSpeakFrame(self.message))
else:
await self.push_frame(frame)
else:
# Pass through other frames
await self.push_frame(frame)
async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespace):
logger.info(f"Starting bot")
@@ -59,15 +151,10 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespac
context_aggregator = llm.create_context_aggregator(context)
async def handle_user_idle(user_idle: UserIdleProcessor, retry_count: int) -> bool:
logger.info(f"User idle, timeout : {user_idle._timeout} retry count: {retry_count}")
if retry_count == 1:
# First attempt: Add a gentle prompt to the conversation
messages.append(
{
"role": "system",
"content": "The user has been quiet. Politely and briefly ask if they're still there.",
}
)
await user_idle.push_frame(LLMMessagesFrame(messages))
# First attempt: Trigger the conversation starter
await user_idle.push_frame(StartConversationFrame())
return True
elif retry_count == 2:
# Second attempt: More direct prompt
@@ -84,16 +171,19 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespac
await user_idle.push_frame(
TTSSpeakFrame("It seems like you're busy right now. Have a nice day!")
)
await task.queue_frame(EndFrame())
await user_idle.push_frame(EndFrame(), FrameDirection.UPSTREAM)
return False
user_idle = UserIdleProcessor(callback=handle_user_idle, timeout=5.0)
user_idle = UserIdleProcessor(callback=handle_user_idle, timeout=4.0)
conversation_starter = ConversationStarterProcessor(message="This is a default message.")
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
user_idle, # Idle user check-in
conversation_starter,
context_aggregator.user(),
llm, # LLM
tts, # TTS
@@ -108,15 +198,15 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespac
allow_interruptions=True,
enable_metrics=True,
report_only_initial_ttfb=True,
observers=[DebugObserver()],
),
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([context_aggregator.user().get_context_frame()])
# Start the user idle timer
await task.queue_frames([BotSpeakingFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):

View File

@@ -20,7 +20,7 @@ from fastapi.responses import RedirectResponse
from loguru import logger
from pipecat_ai_small_webrtc_prebuilt.frontend import SmallWebRTCPrebuiltUI
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
from pipecat.transports.network.webrtc_connection import IceServer, SmallWebRTCConnection
# Load environment variables
load_dotenv(override=True)
@@ -30,7 +30,11 @@ app = FastAPI()
# Store connections by pc_id
pcs_map: Dict[str, SmallWebRTCConnection] = {}
ice_servers = ["stun:stun.l.google.com:19302"]
ice_servers = [
IceServer(
urls="stun:stun.l.google.com:19302",
)
]
# Mount the frontend at /
app.mount("/client", SmallWebRTCPrebuiltUI)

View File

@@ -18,7 +18,7 @@ from fastapi.responses import RedirectResponse
from loguru import logger
from pipecat_ai_small_webrtc_prebuilt.frontend import SmallWebRTCPrebuiltUI
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
from pipecat.transports.network.webrtc_connection import IceServer, SmallWebRTCConnection
# Load environment variables
load_dotenv(override=True)
@@ -28,7 +28,11 @@ app = FastAPI()
# Store connections by pc_id
pcs_map: Dict[str, SmallWebRTCConnection] = {}
ice_servers = ["stun:stun.l.google.com:19302"]
ice_servers = [
IceServer(
urls="stun:stun.l.google.com:19302",
)
]
# Mount the frontend at /
app.mount("/prebuilt", SmallWebRTCPrebuiltUI)

View File

@@ -18,7 +18,7 @@ from fastapi.responses import RedirectResponse
from loguru import logger
from pipecat_ai_small_webrtc_prebuilt.frontend import SmallWebRTCPrebuiltUI
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
from pipecat.transports.network.webrtc_connection import IceServer, SmallWebRTCConnection
# Load environment variables
load_dotenv(override=True)
@@ -28,7 +28,11 @@ app = FastAPI()
# Store connections by pc_id
pcs_map: Dict[str, SmallWebRTCConnection] = {}
ice_servers = ["stun:stun.l.google.com:19302"]
ice_servers = [
IceServer(
urls="stun:stun.l.google.com:19302",
)
]
# Mount the frontend at /
app.mount("/prebuilt", SmallWebRTCPrebuiltUI)

View File

@@ -46,6 +46,46 @@ http://localhost:7860
---
## WebRTC ICE Servers Configuration
When implementing WebRTC in your project, **STUN** (Session Traversal Utilities for NAT) and **TURN** (Traversal Using Relays around NAT)
servers are usually needed in cases where users are behind routers or firewalls.
In local networks (e.g., testing within the same home or office network), you usually dont need to configure STUN or TURN servers.
In such cases, WebRTC can often directly establish peer-to-peer connections without needing to traverse NAT or firewalls.
### What are STUN and TURN Servers?
- **STUN Server**: Helps clients discover their public IP address and port when they're behind a NAT (Network Address Translation) device (like a router).
This allows WebRTC to attempt direct peer-to-peer communication by providing the public-facing IP and port.
- **TURN Server**: Used as a fallback when direct peer-to-peer communication isn't possible due to strict NATs or firewalls blocking connections.
The TURN server relays media traffic between peers.
### Why are ICE Servers Important?
**ICE (Interactive Connectivity Establishment)** is a framework used by WebRTC to handle network traversal and NAT issues.
The `iceServers` configuration provides a list of **STUN** and **TURN** servers that WebRTC uses to find the best way to connect two peers.
### Example Configuration for ICE Servers
Heres how you can configure a basic `iceServers` object in WebRTC for testing purposes, using Google's public STUN server:
```javascript
const config = {
iceServers: [
{
urls: ["stun:stun.l.google.com:19302"], // Google's public STUN server
}
],
};
```
> For testing purposes, you can either use public **STUN** servers (like Google's) or set up your own **TURN** server.
If you're running your own TURN server, make sure to include your server URL, username, and credential in the configuration.
---
### 💡 Notes
- Ensure all dependencies are installed before running the server.
- Check the `.env` file for missing configurations.

View File

@@ -24,27 +24,47 @@
let connected = false
let peerConnection = null
/*const waitForIceGatheringComplete = async (pc) => {
const waitForIceGatheringComplete = async (pc, timeoutMs = 2000) => {
if (pc.iceGatheringState === 'complete') return;
console.log("Waiting for ICE gathering to complete. Current state:", pc.iceGatheringState);
return new Promise((resolve) => {
let timeoutId;
const checkState = () => {
console.log("icegatheringstatechange:", pc.iceGatheringState);
if (pc.iceGatheringState === 'complete') {
pc.removeEventListener('icegatheringstatechange', checkState);
cleanup();
resolve();
}
};
const onTimeout = () => {
console.warn(`ICE gathering timed out after ${timeoutMs} ms.`);
cleanup();
resolve();
};
const cleanup = () => {
pc.removeEventListener('icegatheringstatechange', checkState);
clearTimeout(timeoutId);
};
pc.addEventListener('icegatheringstatechange', checkState);
timeoutId = setTimeout(onTimeout, timeoutMs);
// Checking the state again to avoid any eventual race condition
checkState();
});
}*/
};
const createSmallWebRTCConnection = async (audioTrack) => {
const pc = new RTCPeerConnection()
const config = {
iceServers: [],
};
const pc = new RTCPeerConnection(config)
addPeerConnectionEventListeners(pc)
pc.ontrack = e => audioEl.srcObject = e.streams[0]
// SmallWebRTCTransport expects to receive both transceivers
pc.addTransceiver(audioTrack, { direction: 'sendrecv' })
pc.addTransceiver('video', { direction: 'sendrecv' })
await pc.setLocalDescription(await pc.createOffer())
//await waitForIceGatheringComplete(pc)
await waitForIceGatheringComplete(pc)
const offer = pc.localDescription
const response = await fetch('/api/offer', {
body: JSON.stringify({ sdp: offer.sdp, type: offer.type}),
@@ -57,16 +77,37 @@
}
const connect = async () => {
_onConnecting()
const audioStream = await navigator.mediaDevices.getUserMedia({audio: true})
peerConnection= await createSmallWebRTCConnection(audioStream.getAudioTracks()[0])
peerConnection.onconnectionstatechange = () => {
let connectionState = peerConnection?.connectionState
}
const addPeerConnectionEventListeners = (pc) => {
pc.oniceconnectionstatechange = () => {
console.log("oniceconnectionstatechange", pc?.iceConnectionState)
}
pc.onconnectionstatechange = () => {
console.log("onconnectionstatechange", pc?.connectionState)
let connectionState = pc?.connectionState
if (connectionState === 'connected') {
_onConnected()
} else if (connectionState === 'disconnected') {
_onDisconnected()
}
}
pc.onicecandidate = (event) => {
if (event.candidate) {
console.log("New ICE candidate:", event.candidate);
} else {
console.log("All ICE candidates have been sent.");
}
};
}
const _onConnecting = () => {
statusEl.textContent = "Connecting"
buttonEl.textContent = "Disconnect"
connected = true
}
const _onConnected = () => {

View File

@@ -17,7 +17,7 @@ from fastapi import BackgroundTasks, FastAPI
from fastapi.responses import FileResponse
from loguru import logger
from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection
from pipecat.transports.network.webrtc_connection import IceServer, SmallWebRTCConnection
# Load environment variables
load_dotenv(override=True)
@@ -28,6 +28,13 @@ app = FastAPI()
pcs_map: Dict[str, SmallWebRTCConnection] = {}
ice_servers = [
IceServer(
urls="stun:stun.l.google.com:19302",
)
]
@app.post("/api/offer")
async def offer(request: dict, background_tasks: BackgroundTasks):
pc_id = request.get("pc_id")
@@ -37,7 +44,7 @@ async def offer(request: dict, background_tasks: BackgroundTasks):
logger.info(f"Reusing existing connection for pc_id: {pc_id}")
await pipecat_connection.renegotiate(sdp=request["sdp"], type=request["type"])
else:
pipecat_connection = SmallWebRTCConnection()
pipecat_connection = SmallWebRTCConnection(ice_servers)
await pipecat_connection.initialize(sdp=request["sdp"], type=request["type"])
@pipecat_connection.event_handler("closed")

View File

@@ -40,7 +40,7 @@ class HttpSmartTurnAnalyzer(BaseSmartTurn):
async def _send_raw_request(self, data_bytes: bytes) -> Dict[str, Any]:
headers = {"Content-Type": "application/octet-stream"}
headers.update(self._headers)
logger.trace(f"Sending {len(data_bytes)} bytes as raw body to {self._url}...")
try:
timeout = aiohttp.ClientTimeout(total=self._params.stop_secs)
@@ -50,23 +50,30 @@ class HttpSmartTurnAnalyzer(BaseSmartTurn):
logger.trace("\n--- Response ---")
logger.trace(f"Status Code: {response.status}")
if response.status == 200:
try:
json_data = await response.json()
logger.trace("Response JSON:")
logger.trace(json_data)
return json_data
except aiohttp.ContentTypeError:
# Non-JSON response
text = await response.text()
logger.trace("Response Content (non-JSON):")
logger.trace(text)
raise Exception(f"Non-JSON response: {text}")
else:
# Check if successful
if response.status != 200:
error_text = await response.text()
logger.trace("Response Content (Error):")
logger.trace(error_text)
response.raise_for_status()
if response.status == 500:
logger.warning(f"Smart turn service returned 500 error: {error_text}")
raise Exception(f"Server returned HTTP 500: {error_text}")
else:
response.raise_for_status()
# Process successful response
try:
json_data = await response.json()
logger.trace("Response JSON:")
logger.trace(json_data)
return json_data
except aiohttp.ContentTypeError:
# Non-JSON response
text = await response.text()
logger.trace("Response Content (non-JSON):")
logger.trace(text)
raise Exception(f"Non-JSON response: {text}")
except asyncio.TimeoutError:
logger.error(f"Request timed out after {self._params.stop_secs} seconds")
@@ -76,5 +83,14 @@ class HttpSmartTurnAnalyzer(BaseSmartTurn):
raise Exception("Failed to send raw request to Daily Smart Turn.")
async def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, Any]:
serialized_array = self._serialize_array(audio_array)
return await self._send_raw_request(serialized_array)
try:
serialized_array = self._serialize_array(audio_array)
return await self._send_raw_request(serialized_array)
except Exception as e:
logger.error(f"Smart turn prediction failed: {str(e)}")
# Return an incomplete prediction when a failure occurs
return {
"prediction": 0,
"probability": 0.0,
"metrics": {"inference_time": 0.0, "total_time": 0.0},
}

View File

@@ -7,7 +7,7 @@
import asyncio
import json
import time
from typing import Any, Literal, Optional, Union
from typing import Any, List, Literal, Optional, Union
from av.frame import Frame
from loguru import logger
@@ -87,13 +87,21 @@ class SmallWebRTCTrack:
return getattr(self._track, name)
# Alias so we don't need to expose RTCIceServer
IceServer = RTCIceServer
class SmallWebRTCConnection(BaseObject):
def __init__(self, ice_servers=None):
def __init__(self, ice_servers: Optional[Union[List[str], List[IceServer]]] = None):
super().__init__()
if ice_servers:
self.ice_servers = [RTCIceServer(urls=server) for server in ice_servers]
if not ice_servers:
self.ice_servers: List[IceServer] = []
elif all(isinstance(s, IceServer) for s in ice_servers):
self.ice_servers = ice_servers
elif all(isinstance(s, str) for s in ice_servers):
self.ice_servers = [IceServer(urls=s) for s in ice_servers]
else:
self.ice_servers = []
raise TypeError("ice_servers must be either List[str] or List[RTCIceServer]")
self._connect_invoked = False
self._track_map = {}
self._track_getters = {