Compare commits

...

52 Commits

Author SHA1 Message Date
Chad Bailey
1472a3abb8 attempt at 2 pipelines 2025-02-24 21:25:13 +00:00
Dominic
3745078bf1 Fixed logic 2025-02-24 10:44:07 -08:00
Dominic
1a2c98f70b Starting to add logic for native audio input for flash lite 2025-02-24 10:28:28 -08:00
Dominic
e988ce6838 Forgot to use the same logic for the openai bot 2025-02-22 14:52:53 -08:00
Dominic
546c97e75b Simplified logic for dialin 2025-02-22 14:49:33 -08:00
Dominic
410a6b9238 moved terminate call to handlers class 2025-02-22 14:38:14 -08:00
Dominic
281b56e5de Updated prompt for non gemini bot to look for more voicemail examples, plus added logic to detect if we're doing dialin or not to avoid a non-fatal dialin related error 2025-02-21 16:19:59 -08:00
Dominic
c66042afb6 Fixed import ordering 2025-02-20 14:56:45 -08:00
Dominic Stewart
61f8e54dec Merge branch 'main' into dom/gemini-system-prompt-switching 2025-02-20 14:48:45 -08:00
Dominic
390adf193a Added a few more things to detect in the prompt 2025-02-20 14:44:12 -08:00
Dominic
68587ca4e9 Updated the code to use the correct prompt broken down into smaller pieces 2025-02-20 14:28:02 -08:00
Dominic
b71ad2d082 I think this works 2025-02-20 09:42:19 -08:00
Dominic
781652f4f9 Improvement 2025-02-20 09:27:34 -08:00
Aleix Conchillo Flaqué
81093d3bed Merge pull request #1252 from pipecat-ai/aleix/remove-vad-extra-logging
BaseInputTransport: remove VAD logging
2025-02-20 07:32:20 -08:00
Aleix Conchillo Flaqué
d9a67164f6 Merge pull request #1251 from pipecat-ai/aleix/fish-tts-service-push-stop-frame
FishAudioTTSService should push TTSStoppedFrame
2025-02-20 07:32:05 -08:00
Aleix Conchillo Flaqué
98259af54e update CHANGELOG 2025-02-19 22:05:48 -08:00
Dominic Stewart
039d144c79 examples(phone-bot): updated example to use Gemini (#1233) 2025-02-19 22:03:37 -08:00
Aleix Conchillo Flaqué
d0f67fc189 BaseInputTransport: remove VAD logging
These logs are very verbose. They were added to try to find an issue that
resulted in being because of low CPU/memory resources, but these logs were not
helpful to determine that.
2025-02-19 21:55:11 -08:00
Aleix Conchillo Flaqué
6e3f96aa83 fish: automatically send TTSStoppedFrame after timeout 2025-02-19 21:41:18 -08:00
Aleix Conchillo Flaqué
293677588d tts: make push_stop_frames default to 2.0s 2025-02-19 21:39:00 -08:00
Dominic
621813571a This works 2025-02-19 20:24:27 -08:00
Filipi da Silva Fuchter
77e777b1ce Merge pull request #1249 from pipecat-ai/invoking_call_start_function
Fixed an issue that `start_callback` was not invoked for some LLM services
2025-02-19 18:09:00 -03:00
Filipi Fuchter
7e7926059c Fixed an issue that start_callback was not invoked for some LLM services. 2025-02-19 18:04:20 -03:00
Aleix Conchillo Flaqué
c948754eff Merge pull request #1248 from pipecat-ai/aleix/daily-transport-room-url
daily: add room_url property
2025-02-19 09:46:46 -08:00
Aleix Conchillo Flaqué
83f1a8830d daily: add room_url property 2025-02-19 09:29:53 -08:00
James Hush
80f8e05fcf docs: fix transcripts in translation chatbot example (#1199) 2025-02-19 16:07:22 +08:00
Aleix Conchillo Flaqué
afd1a1e80b Merge pull request #1245 from pipecat-ai/aleix/stt-mute-filter-trace-logging 2025-02-18 21:21:55 -08:00
Dominic
ceefea8d63 Changed example to use gemini 2.0 flash lite 2025-02-18 19:08:22 -08:00
Dominic
1974474480 Updated the readme 2025-02-18 18:16:27 -08:00
Dominic
160d054aa5 Updated the readme 2025-02-18 18:10:34 -08:00
Dominic
4718f68717 Based on feedback, made the gemini file something that can be called separately 2025-02-18 18:04:29 -08:00
Aleix Conchillo Flaqué
84ac88cad7 STTMuteFilter: change suppressed logging to trace 2025-02-18 18:03:37 -08:00
Aleix Conchillo Flaqué
211163e5c7 Merge pull request #1241 from pipecat-ai/aleix/deepgram-nova-3
deepgram: use the new nova-3 model as default
2025-02-18 17:53:04 -08:00
Aleix Conchillo Flaqué
1b0bcebef6 deepgram: use the new nova-3 model as default 2025-02-18 17:51:54 -08:00
Aleix Conchillo Flaqué
89736b03c4 Merge pull request #1243 from pipecat-ai/aleix/add-deepgram-addons
deepgram: add ability to provide custom addons
2025-02-18 17:47:48 -08:00
Aleix Conchillo Flaqué
4edda718ed deepgram: add ability to provide custom addons 2025-02-18 17:45:41 -08:00
Aleix Conchillo Flaqué
22a62edc9e Merge pull request #1242 from pipecat-ai/aleix/utils-network-exponential
network: added exponential_backoff_time() function
2025-02-18 17:44:21 -08:00
Aleix Conchillo Flaqué
50b6cc8135 network: added exponential_backoff_time() function 2025-02-18 17:42:43 -08:00
Aleix Conchillo Flaqué
45cf36925a Merge pull request #1240 from pipecat-ai/aleix/handle-deepgram-on-error
deepgram: handle error event and reconnect
2025-02-18 17:41:29 -08:00
Filipi da Silva Fuchter
83a71e1fec Merge pull request #1112 from pipecat-ai/bot-ready-signalling-rn
React Native client for the bot ready example.
2025-02-18 15:17:38 -03:00
Filipi Fuchter
e809c8680e Upgrading to use the latest node stable version 2025-02-18 15:12:44 -03:00
Aleix Conchillo Flaqué
c926063d74 deepgram: handle error event and reconnect 2025-02-18 09:52:18 -08:00
Aleix Conchillo Flaqué
0334550356 Merge pull request #1238 from pipecat-ai/aleix/stt-mute-filter-ignore-input-audio-frames
STTMuteFilter: ignore audio frames so no transcriptions are generated
2025-02-18 09:48:13 -08:00
Aleix Conchillo Flaqué
90b9dce710 STTMuteFilter: ignore audio frames so no transcriptions are generated 2025-02-17 19:59:05 -08:00
Dominic
3a781c786c Fixed typo 2025-02-17 10:22:06 -08:00
Dominic
a066e2bcfd Updated example to use Gemini 2025-02-17 10:17:59 -08:00
Filipi Fuchter
7e3e126730 Migrating the base API URL for the react native example to an .env file. 2025-01-30 10:42:16 -03:00
Filipi Fuchter
75ca0571bb Improving the layout from the bot ready react native demo. 2025-01-30 10:31:04 -03:00
Filipi Fuchter
a48e5d0714 Only sending the message when it is a remote audio track. 2025-01-30 10:14:37 -03:00
Filipi Fuchter
2b6a992207 Sending the app-message to start playing audio once the track has started. 2025-01-30 09:37:33 -03:00
Filipi Fuchter
24cf106ed2 Refactoring the code to ask for the room that it should connect. 2025-01-30 09:14:18 -03:00
Filipi Fuchter
95c8346cb5 Starting to create a react native client for the bot ready example. 2025-01-29 19:00:42 -03:00
36 changed files with 11999 additions and 55 deletions

15
.gitignore vendored
View File

@@ -32,6 +32,21 @@ fly.toml
# Example files
pipecat/examples/twilio-chatbot/templates/streams.xml
pipecat/examples/bot-ready-signalling/client/react-native/node_modules/
pipecat/examples/bot-ready-signalling/client/react-native/.expo/
pipecat/examples/bot-ready-signalling/client/react-native/dist/
pipecat/examples/bot-ready-signalling/client/react-native/npm-debug.*
pipecat/examples/bot-ready-signalling/client/react-native/*.jks
pipecat/examples/bot-ready-signalling/client/react-native/*.p8
pipecat/examples/bot-ready-signalling/client/react-native/*.p12
pipecat/examples/bot-ready-signalling/client/react-native/*.key
pipecat/examples/bot-ready-signalling/client/react-native/*.mobileprovision
pipecat/examples/bot-ready-signalling/client/react-native/*.orig.*
pipecat/examples/bot-ready-signalling/client/react-native/web-build/
# macOS
.DS_Store
# Documentation
docs/api/_build/

View File

@@ -5,6 +5,44 @@ All notable changes to **Pipecat** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## Unreleased
### Added
- Added `room_url` property to `DailyTransport`.
- Added `addons` argument to `DeepgramSTTService`.
- Added `exponential_backoff_time()` to `utils.network` module.
### Changed
- `DeepgramSTTService` now uses the new `nova-3` model by default. If you want
to use the previous model you can pass `LiveOptions(model="nova-2-general")`.
(see https://deepgram.com/learn/introducing-nova-3-speech-to-text-api)
```python
stt = DeepgramSTTService(..., live_options=LiveOptions(model="nova-2-general"))
```
### Fixed
- Fixed a `FishAudioTTSService` issue where `TTSStoppedFrame` was not being
pushed.
- Fixed an issue that `start_callback` was not invoked for some LLM services.
- Fixed an issue that would cause `DeepgramSTTService` to stop working after an
error occurred (e.g. sudden network loss). If the network recovered we would
not reconnect.
- Fixed a `STTMuteFilter` issue that would not mute user audio frames causing
transcriptions to be generated by the STT service.
### Other
- Added Gemini support to `examples/phone-chatbot`.
## [0.0.57] - 2025-02-14
### Added

View File

@@ -0,0 +1 @@
22.14

View File

@@ -0,0 +1,60 @@
# React Native Implementation
Basic implementation using the [Pipecat React Native SDK](https://docs.pipecat.ai/client/react-native/introduction).
## Usage
### Expo requirements
This project cannot be used with an [Expo Go](https://docs.expo.dev/workflow/expo-go/) app because [it requires custom native code](https://docs.expo.io/workflow/customizing/).
When a project requires custom native code or a config plugin, we need to transition from using [Expo Go](https://docs.expo.dev/workflow/expo-go/)
to a [development build](https://docs.expo.dev/development/introduction/).
More details about the custom native code used by this demo can be found in [rn-daily-js-expo-config-plugin](https://github.com/daily-co/rn-daily-js-expo-config-plugin).
### Building remotely
If you do not have experience with Xcode and Android Studio builds or do not have them installed locally on your computer, you will need to follow [this guide from Expo to use EAS Build](https://docs.expo.dev/development/create-development-builds/#create-and-install-eas-build).
### Building locally
You will need to have installed locally on your computer:
- [Xcode](https://developer.apple.com/xcode/) to build for iOS;
- [Android Studio](https://developer.android.com/studio) to build for Android;
#### Install the demo dependencies
```bash
# Use the version of node specified in .nvmrc
nvm i
# Install dependencies
npm i
# Before a native app can be compiled, the native source code must be generated.
npx expo prebuild
# Configure the environment variable to connect to the local server
cp env.example .env
# edit .env and add your local ip address, for example: http://192.168.1.16:7860
```
#### Running on Android
After plugging in an Android device [configured for debugging](https://developer.android.com/studio/debug/dev-options), run the following command:
```
npm run android
```
#### Running on iOS
Run the following command:
```
npm run ios
```
#### Connect to the server
Use the http://localhost:5173 in your app.

View File

@@ -0,0 +1,75 @@
{
"expo": {
"name": "bot-ready-rn",
"slug": "bot-ready-rn",
"version": "1.0.0",
"orientation": "portrait",
"icon": "./assets/icon.png",
"userInterfaceStyle": "light",
"splash": {
"image": "./assets/splash.png",
"resizeMode": "contain",
"backgroundColor": "#ffffff"
},
"updates": {
"fallbackToCacheTimeout": 0
},
"assetBundlePatterns": [
"**/*"
],
"ios": {
"supportsTablet": true,
"bitcode": false,
"bundleIdentifier": "co.daily.expo.BotReady",
"infoPlist": {
"UIBackgroundModes": [
"voip"
]
},
"appleTeamId": "EEBGKV9N3N"
},
"android": {
"adaptiveIcon": {
"foregroundImage": "./assets/adaptive-icon.png",
"backgroundColor": "#FFFFFF"
},
"package": "co.daily.expo.BotReady",
"permissions": [
"android.permission.ACCESS_NETWORK_STATE",
"android.permission.BLUETOOTH",
"android.permission.CAMERA",
"android.permission.INTERNET",
"android.permission.MODIFY_AUDIO_SETTINGS",
"android.permission.RECORD_AUDIO",
"android.permission.SYSTEM_ALERT_WINDOW",
"android.permission.WAKE_LOCK",
"android.permission.FOREGROUND_SERVICE",
"android.permission.FOREGROUND_SERVICE_CAMERA",
"android.permission.FOREGROUND_SERVICE_MICROPHONE",
"android.permission.FOREGROUND_SERVICE_MEDIA_PROJECTION",
"android.permission.POST_NOTIFICATIONS"
]
},
"web": {
"favicon": "./assets/favicon.png"
},
"plugins": [
"@config-plugins/react-native-webrtc",
"@daily-co/config-plugin-rn-daily-js",
[
"expo-build-properties",
{
"android": {
"minSdkVersion": 24,
"compileSdkVersion": 35,
"targetSdkVersion": 34,
"buildToolsVersion": "35.0.0"
},
"ios": {
"deploymentTarget": "15.1"
}
}
]
]
}
}

Binary file not shown.

After

Width:  |  Height:  |  Size: 17 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.4 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 22 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 46 KiB

View File

@@ -0,0 +1,7 @@
module.exports = function(api) {
api.cache(true);
return {
presets: ['babel-preset-expo'],
plugins: [["module:react-native-dotenv"]],
};
};

View File

@@ -0,0 +1 @@
API_BASE_URL=http://YOUR_LOCAL_IP:7860

View File

@@ -0,0 +1,7 @@
import { registerRootComponent } from "expo";
import App from "./src/App";
// registerRootComponent calls AppRegistry.registerComponent('main', () => App);
// It also ensures that the environment is set up appropriately
registerRootComponent(App);

View File

@@ -0,0 +1,4 @@
// Learn more https://docs.expo.io/guides/customizing-metro
const { getDefaultConfig } = require('expo/metro-config');
module.exports = getDefaultConfig(__dirname);

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,31 @@
{
"name": "bot-ready-rn",
"version": "1.0.0",
"scripts": {
"start": "expo start --dev-client",
"android": "expo run:android --device",
"ios": "expo run:ios --device",
"web": "expo start --web"
},
"dependencies": {
"@config-plugins/react-native-webrtc": "^10.0.0",
"@daily-co/config-plugin-rn-daily-js": "0.0.7",
"@daily-co/react-native-daily-js": "^0.70.0",
"@daily-co/react-native-webrtc": "^118.0.3-daily.2",
"@react-native-async-storage/async-storage": "1.23.1",
"expo": "^52.0.0",
"expo-build-properties": "~0.13.1",
"expo-dev-client": "~5.0.5",
"expo-splash-screen": "~0.29.16",
"expo-status-bar": "~2.0.0",
"react": "18.3.1",
"react-native": "0.76.3",
"react-native-background-timer": "^2.4.1",
"react-native-dotenv": "^3.4.11",
"react-native-get-random-values": "^1.11.0"
},
"devDependencies": {
"@babel/core": "^7.12.9"
},
"private": true
}

View File

@@ -0,0 +1,121 @@
import React, { useState, useEffect } from 'react';
import {SafeAreaView, View, Text, Button, StyleSheet, ScrollView} from 'react-native';
import Daily from "@daily-co/react-native-daily-js";
import { API_BASE_URL } from "@env";
const CallScreen = () => {
const [connectionStatus, setConnectionStatus] = useState('Disconnected');
const [isConnected, setIsConnected] = useState(false);
const [callObject, setCallObject] = useState(null);
const [logs, setLogs] = useState([]);
useEffect(() => {
if (callObject) {
setupTrackListeners(callObject);
}
}, [callObject]);
const log = (message) => {
setLogs((prevLogs) => [...prevLogs, `${new Date().toISOString()} - ${message}`]);
console.log(message);
};
const setupTrackListeners = (callObject) => {
callObject.on("joined-meeting", () => {
setConnectionStatus('Connected');
setIsConnected(true);
log('Client connected');
});
callObject.on("left-meeting", () => {
setConnectionStatus('Disconnected');
setIsConnected(false);
log('Client disconnected');
});
callObject.on("participant-left", () => {
// When the bot leaves, we are also disconnecting from the call
disconnect().catch((err) => {
log(`Failed to disconnect ${err}`);
})
});
// Trigger so the bot can start sending audio
callObject.on("track-started", (evt) => {
if (evt.track.kind === "audio" && evt.participant.local === false) {
handleEventToConsole(evt)
log("Sending the message that will trigger the bot to play the audio.")
callObject.sendAppMessage("playable")
}
});
callObject.on("error", (evt) => log(`Error: ${evt.error}`));
// Other events just for awareness
callObject.on("track-stopped", handleEventToConsole);
callObject.on("participant-joined", handleEventToConsole);
callObject.on("participant-updated", handleEventToConsole);
};
const handleEventToConsole = (evt) => {
log(`Received event: ${evt.action}`);
};
const connect = async () => {
try {
const callObject = Daily.createCallObject({ subscribeToTracksAutomatically: true });
setCallObject(callObject);
const connectionUrl = `${API_BASE_URL}/connect`
const res = await fetch(connectionUrl, { method: "POST", headers: { "Content-Type": "application/json" } });
const roomInfo = await res.json();
await callObject.join({ url: roomInfo.room_url });
} catch (error) {
log(`Error connecting: ${error.message}`);
}
};
const disconnect = async () => {
if (callObject) {
try {
await callObject.leave();
await callObject.destroy();
setCallObject(null);
} catch (error) {
log(`Error disconnecting: ${error.message}`);
}
}
};
return (
<SafeAreaView style={styles.safeArea}>
<View style={styles.container}>
<View style={styles.statusBar}>
<Text>Status: <Text style={styles.status}>{connectionStatus}</Text></Text>
<View style={styles.controls}>
<Button
title={isConnected ? "Disconnect" : "Connect"}
onPress={isConnected ? disconnect : connect}
/>
</View>
</View>
<View style={styles.debugPanel}>
<Text style={styles.debugTitle}>Debug Info</Text>
<ScrollView style={styles.debugLog}>
{logs.map((logEntry, index) => (
<Text key={index} style={styles.logText}>{logEntry}</Text>
))}
</ScrollView>
</View>
</View>
</SafeAreaView>
);
};
const styles = StyleSheet.create({
safeArea: { flex: 1, backgroundColor: '#f0f0f0', padding: 20 },
container: { flex: 1, margin: 20 },
statusBar: { flexDirection: 'row', justifyContent: 'space-between', alignItems: 'center', padding: 10, backgroundColor: '#fff', borderRadius: 8, marginBottom: 20 },
status: { fontWeight: 'bold' },
controls: { flexDirection: 'row', gap: 10 },
debugPanel: { height: '80%', backgroundColor: '#fff', borderRadius: 8, padding: 20},
debugTitle: { fontSize: 16, fontWeight: 'bold' },
debugLog: { height: '100%', overflow: 'scroll', backgroundColor: '#f8f8f8', padding: 10, borderRadius: 4, fontFamily: 'monospace', fontSize: 12, lineHeight: 1.4 },
});
export default CallScreen;

View File

@@ -14,6 +14,7 @@ from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -30,6 +31,12 @@ logger.add(sys.stderr, level="DEBUG")
video_participant_id = None
async def start_fetch_weather(function_name, llm, context):
"""Push a frame to the LLM; this is handy when the LLM response might take a while."""
await llm.push_frame(TTSSpeakFrame("Let me check on that."))
logger.debug(f"Starting fetch_weather_from_api with function_name: {function_name}")
async def get_weather(function_name, tool_call_id, arguments, llm, context, result_callback):
location = arguments["location"]
await result_callback(f"The weather in {location} is currently 72 degrees and sunny.")
@@ -63,7 +70,7 @@ async def main():
)
llm = GoogleLLMService(api_key=os.getenv("GOOGLE_API_KEY"), model="gemini-2.0-flash-001")
llm.register_function("get_weather", get_weather)
llm.register_function("get_weather", get_weather, start_fetch_weather)
llm.register_function("get_image", get_image)
tools = [

View File

@@ -1,3 +1,5 @@
<!-- @format -->
<div align="center">
 <img alt="pipecat" width="300px" height="auto" src="image.png">
</div>
@@ -104,6 +106,41 @@ curl -X POST "http://localhost:7860/daily_start_bot" \
-d '{"dialoutNumber": "+18057145330", "detectVoicemail": true}'
```
### New! Using Gemini 2.0 Flash Lite with Daily
We have introduced support for Google's Gemini 2.0 Flash Lite model in this example. This lightweight model offers faster response times and reduced costs while maintaining good conversational capabilities.
**Quick Start**
To use the Gemini-based bot instead of OpenAI:
```shell
curl -X POST "http://localhost:7860/daily_gemini_start_bot" \ py pipecat
-H "Content-Type: application/json" \
-d '{"detectVoicemail": true}'
```
All request body parameters supported by /daily_start_bot (such as detectVoicemail, dialoutNumber, etc.) are also compatible with /daily_gemini_start_bot.
This example uses context switching to help steer the bot in the right direction. As Flash Lite is a smaller model, getting it to consistently call functions was difficult for these longer prompts. Breaking the prompt
down into smaller pieces helped improve the accuracy of the bot.
**Implementation Details**
The implementation is available in bot_daily_gemini.py and features:
Staged prompting approach: Breaking down complex tasks into smaller, more focused prompts to improve the lightweight model's performance
Dynamic context switching: The bot can change its behavior in real-time based on what it detects (voicemail vs. human caller)
Function-based architecture: Uses function calling to trigger context switches and call termination
**Optimizations for Lightweight Models**
Working with Gemini 2.0 Flash Lite required some specific optimizations:
Simplified prompts: Each prompt focuses on a single task with clear instructions
Function-driven state changes: The model calls specific functions to switch between different conversation modes
Reduced context requirements: Each stage maintains only the context needed for its specific purpose
This approach significantly improves the consistency of function calling in this lightweight model, which was challenging with longer, more complex prompts.
### More information
For more configuration options, please consult [Daily's API documentation](https://docs.daily.co).

View File

@@ -49,7 +49,11 @@ async def main(
# If you are handling this via Twilio, Telnyx, set this to None
# and handle call-forwarding when on_dialin_ready fires.
dialin_settings = DailyDialinSettings(call_id=callId, call_domain=callDomain)
# We don't want to specify dialin settings if we're not dialing in
dialin_settings = None
if callId and callDomain:
dialin_settings = DailyDialinSettings(call_id=callId, call_domain=callDomain)
transport = DailyTransport(
room_url,
token,
@@ -96,8 +100,16 @@ async def main(
- **"Please leave a message after the beep."**
- **"No one is available to take your call."**
- **"Record your message after the tone."**
- **"Please leave a message after the beep"**
- **"You have reached voicemail for..."**
- **"You have reached [phone number]"**
- **"[phone number] is unavailable"**
- **"The person you are trying to reach..."**
- **"The number you have dialed..."**
- **"Your call has been forwarded to an automated voice messaging system"**
- **Any phrase that suggests an answering machine or voicemail.**
- **ASSUME IT IS A VOICEMAIL. DO NOT WAIT FOR MORE CONFIRMATION.**
- **IF THE CALL SAYS "PLEASE LEAVE A MESSAGE AFTER THE BEEP", WAIT FOR THE BEEP BEFORE LEAVING A MESSAGE.**
#### **Step 2: Leave a Voicemail Message**
- Immediately say:
@@ -110,7 +122,9 @@ async def main(
- If the call is answered by a human, say:
*"Oh, hello! I'm a friendly chatbot. Is there anything I can help you with?"*
- Keep responses **brief and helpful**.
- If the user no longer needs assistance, **call `terminate_call` immediately.**
- If the user no longer needs assistance, say:
*"Okay, thank you! Have a great day!"*
-**Then call `terminate_call` immediately.**
---

View File

@@ -0,0 +1,393 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import asyncio
import os
import sys
from dataclasses import dataclass
from typing import Optional
import google.ai.generativelanguage as glm
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import (
BotStoppedSpeakingFrame,
EndTaskFrame,
Frame,
InputAudioRawFrame,
StopTaskFrame,
SystemFrame,
TranscriptionFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.ai_services import LLMService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.google import GoogleLLMContext, GoogleLLMService
from pipecat.transports.services.daily import DailyDialinSettings, DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
daily_api_key = os.getenv("DAILY_API_KEY", "")
daily_api_url = os.getenv("DAILY_API_URL", "https://api.daily.co/v1")
system_message = None
class UserAudioCollector(FrameProcessor):
"""This FrameProcessor collects audio frames in a buffer, then adds them to the
LLM context when the user stops speaking.
"""
def __init__(self, context, user_context_aggregator):
super().__init__()
self._context = context
self._user_context_aggregator = user_context_aggregator
self._audio_frames = []
self._start_secs = 0.2 # this should match VAD start_secs (hardcoding for now)
self._user_speaking = False
async def process_frame(self, frame, direction):
await super().process_frame(frame, direction)
if isinstance(frame, TranscriptionFrame):
# We could gracefully handle both audio input and text/transcription input ...
# but let's leave that as an exercise to the reader. :-)
return
if isinstance(frame, UserStartedSpeakingFrame):
self._user_speaking = True
elif isinstance(frame, UserStoppedSpeakingFrame):
self._user_speaking = False
self._context.add_audio_frames_message(audio_frames=self._audio_frames)
await self._user_context_aggregator.push_frame(
self._user_context_aggregator.get_context_frame()
)
elif isinstance(frame, InputAudioRawFrame):
if self._user_speaking:
self._audio_frames.append(frame)
else:
# Append the audio frame to our buffer. Treat the buffer as a ring buffer, dropping the oldest
# frames as necessary. Assume all audio frames have the same duration.
self._audio_frames.append(frame)
frame_duration = len(frame.audio) / 16 * frame.num_channels / frame.sample_rate
buffer_duration = frame_duration * len(self._audio_frames)
while buffer_duration > self._start_secs:
self._audio_frames.pop(0)
buffer_duration -= frame_duration
await self.push_frame(frame, direction)
class ContextSwitcher:
def __init__(self, llm, context_aggregator):
self._llm = llm
self._context_aggregator = context_aggregator
async def switch_context(self, system_instruction):
"""Switch the context to a new system instruction based on what the bot hears."""
# Create messages with updated system instruction
messages = [
{
"role": "system",
"content": system_instruction,
}
]
# Update context with new messages
self._context_aggregator.set_messages(messages)
# Get the context frame with the updated messages
context_frame = self._context_aggregator.get_context_frame()
# Trigger LLM response by pushing a context frame
await self._llm.push_frame(context_frame)
class FunctionHandlers:
def __init__(self, context_switcher):
self.context_switcher = context_switcher
async def voicemail_response(
self, function_name, tool_call_id, args, llm, context, result_callback
):
"""Function the bot can call to leave a voicemail message."""
print(f"!!! Got a voicemail response, llm is: {llm}")
system_message = """You are Chatbot leaving a voicemail message. Say EXACTLY this message and nothing else:
"Hello, this is a message for Pipecat example user. This is Chatbot. Please call back on 123-456-7891. Thank you."
After saying this message, call the terminate_call function."""
print("!!! about to push stop task frame from voicemail")
await llm.queue_frame(StopTaskFrame(), FrameDirection.UPSTREAM)
print("!!! pushed stop task frame from voicemail")
await result_callback("Goodbye")
async def human_conversation(
self, function_name, tool_call_id, args, llm, context, result_callback
):
"""Function the bot can when it detects it's talking to a human."""
print(f"!!! Got a human response, llm is: {llm}")
system_message = """You are Chatbot talking to a human. Be friendly and helpful.
Start with: "Hello! I'm a friendly chatbot. How can I help you today?"
Keep your responses brief and to the point. Listen to what the person says.
When the person indicates they're done with the conversation by saying something like:
- "Goodbye"
- "That's all"
- "I'm done"
- "Thank you, that's all I needed"
THEN say: "Thank you for chatting. Goodbye!" and call the terminate_call function."""
print("!!! about to push stop task frame from human")
await llm.queue_frame(StopTaskFrame(), FrameDirection.UPSTREAM)
print("!!! pushed stop task frame from human")
await result_callback("Goodbye")
async def terminate_call(
function_name, tool_call_id, args, llm: LLMService, context, result_callback
):
"""Function the bot can call to terminate the call upon completion of the call."""
await llm.queue_frame(EndTaskFrame(), FrameDirection.UPSTREAM)
async def main(
room_url: str,
token: str,
callId: str,
callDomain: str,
detect_voicemail: bool,
dialout_number: Optional[str],
):
# dialin_settings are only needed if Daily's SIP URI is used
# If you are handling this via Twilio, Telnyx, set this to None
# and handle call-forwarding when on_dialin_ready fires.
# We don't want to specify dialin settings if we're not dialing in
dialin_settings = None
if callId and callDomain:
dialin_settings = DailyDialinSettings(call_id=callId, call_domain=callDomain)
transport = DailyTransport(
room_url,
token,
"Chatbot",
DailyParams(
api_url=daily_api_url,
api_key=daily_api_key,
dialin_settings=dialin_settings,
audio_in_enabled=True,
audio_out_enabled=True,
camera_out_enabled=False,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
# transcription_enabled=True,
),
)
tts = ElevenLabsTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY", ""),
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
)
tools = [
{
"function_declarations": [
{
"name": "switch_to_voicemail_response",
"description": "Call this function when you detect this is a voicemail system.",
},
{
"name": "switch_to_human_conversation",
"description": "Call this function when you detect this is a human.",
},
{
"name": "terminate_call",
"description": "Call this function to terminate the call.",
},
]
}
]
system_instruction = """You are Chatbot trying to determine if this is a voicemail system or a human.
If you hear any of these phrases (or very similar ones):
- "Please leave a message after the beep"
- "No one is available to take your call"
- "Record your message after the tone"
- "You have reached voicemail for..."
- "You have reached [phone number]"
- "[phone number] is unavailable"
- "The person you are trying to reach..."
- "The number you have dialed..."
- "Your call has been forwarded to an automated voice messaging system"
Then call the function switch_to_voicemail_response.
If it sounds like a human (saying hello, asking questions, etc.), call the function switch_to_human_conversation.
DO NOT say anything until you've determined if this is a voicemail or human."""
greeting_llm = GoogleLLMService(
model="models/gemini-2.0-flash-lite-preview-02-05",
api_key=os.getenv("GOOGLE_API_KEY"),
system_instruction=system_instruction,
tools=tools,
)
greeting_context = GoogleLLMContext()
greeting_context_aggregator = greeting_llm.create_context_aggregator(greeting_context)
greeting_audio_collector = UserAudioCollector(
greeting_context, greeting_context_aggregator.user()
)
context_switcher = ContextSwitcher(greeting_llm, greeting_context_aggregator.user())
handlers = FunctionHandlers(context_switcher)
greeting_llm.register_function("switch_to_voicemail_response", handlers.voicemail_response)
greeting_llm.register_function("switch_to_human_conversation", handlers.human_conversation)
greeting_llm.register_function("terminate_call", terminate_call)
greeting_pipeline = Pipeline(
[
transport.input(), # Transport user input
greeting_audio_collector, # Collect audio frames
greeting_context_aggregator.user(), # User responses
greeting_llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
greeting_context_aggregator.assistant(), # Assistant spoken responses
]
)
greeting_pipeline_task = PipelineTask(
greeting_pipeline,
PipelineParams(allow_interruptions=True),
)
runner = PipelineRunner()
print("!!! starting greeting")
await runner.run(greeting_pipeline_task)
print("!!! Done with greeting")
# Create conversation pipeline with new system message
conversation_llm = GoogleLLMService(
model="models/gemini-2.0-flash-lite-preview-02-05",
api_key=os.getenv("GOOGLE_API_KEY"),
system_instruction=system_message if system_message else "You are a helpful chatbot.",
tools=[
{
"function_declarations": [
{
"name": "terminate_call",
"description": "Call this function to terminate the call.",
}
]
}
],
)
conversation_llm.register_function("terminate_call", terminate_call)
conversation_context = GoogleLLMContext()
conversation_context_aggregator = conversation_llm.create_context_aggregator(
conversation_context
)
conversation_audio_collector = UserAudioCollector(
conversation_context, conversation_context_aggregator.user()
)
conversation_pipeline = Pipeline(
[
transport.input(), # Transport user input
conversation_audio_collector, # Collect audio frames
conversation_context_aggregator.user(), # User responses
conversation_llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
conversation_context_aggregator.assistant(), # Assistant spoken responses
]
)
conversation_task = PipelineTask(
conversation_pipeline,
PipelineParams(allow_interruptions=True),
)
if dialout_number:
logger.debug("dialout number detected; doing dialout")
# Configure some handlers for dialing out
@transport.event_handler("on_joined")
async def on_joined(transport, data):
logger.debug(f"Joined; starting dialout to: {dialout_number}")
await transport.start_dialout({"phoneNumber": dialout_number})
@transport.event_handler("on_dialout_connected")
async def on_dialout_connected(transport, data):
logger.debug(f"Dial-out connected: {data}")
@transport.event_handler("on_dialout_answered")
async def on_dialout_answered(transport, data):
logger.debug(f"Dial-out answered: {data}")
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
# unlike the dialin case, for the dialout case, the caller will speak first. Presumably
# they will answer the phone and say "Hello?" Since we've captured their transcript,
# That will put a frame into the pipeline and prompt an LLM completion, which is how the
# bot will then greet the user.
elif detect_voicemail:
logger.debug("Detect voicemail example. You can test this in example in Daily Prebuilt")
# For the voicemail detection case, we do not want the bot to answer the phone. We want it to wait for the voicemail
# machine to say something like 'Leave a message after the beep', or for the user to say 'Hello?'.
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
else:
logger.debug("no dialout number; assuming dialin")
# Different handlers for dialin
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
# For the dialin case, we want the bot to answer the phone and greet the user. We
# can prompt the bot to speak by putting the context into the pipeline.
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await conversation_task.cancel()
print("!!! Starting conversation")
await runner.run(conversation_task)
print("!!! Done with conversation")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Pipecat Simple ChatBot")
parser.add_argument("-u", type=str, help="Room URL")
parser.add_argument("-t", type=str, help="Token")
parser.add_argument("-i", type=str, help="Call ID")
parser.add_argument("-d", type=str, help="Call Domain")
parser.add_argument("-v", action="store_true", help="Detect voicemail")
parser.add_argument("-o", type=str, help="Dialout number", default=None)
config = parser.parse_args()
asyncio.run(main(config.u, config.t, config.i, config.d, config.v, config.o))

View File

@@ -110,10 +110,15 @@ async def _create_daily_room(
# Spawn a new agent, and join the user session
# Note: this is mostly for demonstration purposes (refer to 'deployment' in docs)
print(f"Vendor: {vendor}")
if vendor == "daily":
bot_proc = f"python3 -m bot_daily -u {room.url} -t {token} -i {callId} -d {callDomain}{' -v' if detect_voicemail else ''}"
if dialoutNumber:
bot_proc += f" -o {dialoutNumber}"
elif vendor == "daily-gemini":
bot_proc = f"python3 -m bot_daily_gemini -u {room.url} -t {token} -i {callId} -d {callDomain}{' -v' if detect_voicemail else ''}"
if dialoutNumber:
bot_proc += f" -o {dialoutNumber}"
else:
bot_proc = f"python3 -m bot_twilio -u {room.url} -t {token} -i {callId} -s {room.config.sip_endpoint}"
@@ -201,6 +206,38 @@ async def daily_start_bot(request: Request) -> JSONResponse:
return JSONResponse({"room_url": room.url, "sipUri": room.config.sip_endpoint})
@app.post("/daily_gemini_start_bot")
async def daily_gemini_start_bot(request: Request) -> JSONResponse:
# The /daily_start_bot is invoked when a call is received on Daily's SIP URI
# daily_start_bot will create the room, put the call on hold until
# the bot and sip worker are ready. Daily will automatically
# forward the call to the SIP URi when dialin_ready fires.
# Use specified room URL, or create a new one if not specified
room_url = os.getenv("DAILY_SAMPLE_ROOM_URL", None)
# Get the dial-in properties from the request
try:
data = await request.json()
if "test" in data:
# Pass through any webhook checks
return JSONResponse({"test": True})
detect_voicemail = data.get("detectVoicemail", False)
callId = data.get("callId", None)
callDomain = data.get("callDomain", None)
dialoutNumber = data.get("dialoutNumber", None)
except Exception:
raise HTTPException(
status_code=500, detail="Missing properties 'callId', 'callDomain', or 'dialoutNumber'"
)
room: DailyRoomObject = await _create_daily_room(
room_url, callId, callDomain, dialoutNumber, "daily-gemini", detect_voicemail
)
# Grab a token for the user to join with
return JSONResponse({"room_url": room.url, "sipUri": room.config.sip_endpoint})
# ----------------- Main ----------------- #

View File

@@ -24,17 +24,15 @@ from pipecat.frames.frames import (
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.frameworks.rtvi import RTVIObserver, RTVIProcessor
from pipecat.processors.transcript_processor import TranscriptProcessor
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import (
DailyParams,
DailyTransport,
)
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
@@ -166,21 +164,32 @@ async def main():
async def on_transcript_update(processor, frame):
await transcript_handler.on_transcript_update(processor, frame)
rtvi = RTVIProcessor()
pipeline = Pipeline(
[
transport.input(),
rtvi,
stt,
transcript.user(), # User transcripts
tp,
llm,
tts,
transport.output(),
transcript.assistant(),
context_aggregator.assistant(),
transcript.assistant(), # Assistant transcripts
]
)
task = PipelineTask(pipeline)
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=False, # We don't want to interrupt the translator bot
enable_metrics=True,
enable_usage_metrics=True,
observers=[RTVIObserver(rtvi)],
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):

View File

@@ -1,4 +1,4 @@
python-dotenv
fastapi[all]
pipecat-ai[daily,openai,azure]
pipecat-ai[cartesia,daily,deepgram,openai,silero]
aiohttp

View File

@@ -23,6 +23,7 @@ from pipecat.frames.frames import (
Frame,
FunctionCallInProgressFrame,
FunctionCallResultFrame,
InputAudioRawFrame,
StartFrame,
StartInterruptionFrame,
StopInterruptionFrame,
@@ -185,13 +186,14 @@ class STTMuteFilter(FrameProcessor):
StopInterruptionFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
InputAudioRawFrame,
),
):
# Only pass VAD-related frames when not muted
if not self.is_muted:
await self.push_frame(frame, direction)
else:
logger.debug(f"{frame.__class__.__name__} suppressed - STT currently muted")
logger.trace(f"{frame.__class__.__name__} suppressed - STT currently muted")
else:
# Pass all other frames through
await self.push_frame(frame, direction)

View File

@@ -175,6 +175,7 @@ class LLMService(AIService):
f = self._callbacks[None]
else:
return None
await self.call_start_function(context, function_name)
await context.call_function(
f,
function_name=function_name,
@@ -208,7 +209,7 @@ class TTSService(AIService):
# if True, TTSService will push TTSStoppedFrames, otherwise subclass must do it
push_stop_frames: bool = False,
# if push_stop_frames is True, wait for this idle period before pushing TTSStoppedFrame
stop_frame_timeout_s: float = 1.0,
stop_frame_timeout_s: float = 2.0,
# if True, TTSService will push silence audio frames after TTSStoppedFrame
push_silence_after_stop: bool = False,
# if push_silence_after_stop is True, send this amount of audio silence

View File

@@ -5,7 +5,7 @@
#
import asyncio
from typing import AsyncGenerator, Optional
from typing import AsyncGenerator, Dict, Optional
from loguru import logger
@@ -34,6 +34,7 @@ try:
AsyncListenWebSocketClient,
DeepgramClient,
DeepgramClientOptions,
ErrorResponse,
LiveOptions,
LiveResultResponse,
LiveTranscriptionEvents,
@@ -120,6 +121,7 @@ class DeepgramSTTService(STTService):
url: str = "",
sample_rate: Optional[int] = None,
live_options: Optional[LiveOptions] = None,
addons: Optional[Dict] = None,
**kwargs,
):
super().__init__(sample_rate=sample_rate, **kwargs)
@@ -127,7 +129,7 @@ class DeepgramSTTService(STTService):
default_options = LiveOptions(
encoding="linear16",
language=Language.EN,
model="nova-2-general",
model="nova-3-general",
channels=1,
interim_results=True,
smart_format=True,
@@ -147,6 +149,7 @@ class DeepgramSTTService(STTService):
merged_options.language = merged_options.language.value
self._settings = merged_options.to_dict()
self._addons = addons
self._client = DeepgramClient(
api_key,
@@ -155,13 +158,10 @@ class DeepgramSTTService(STTService):
options={"keepalive": "true"}, # verbose=logging.DEBUG
),
)
self._connection: AsyncListenWebSocketClient = self._client.listen.asyncwebsocket.v("1")
self._connection.on(LiveTranscriptionEvents.Transcript, self._on_message)
if self.vad_enabled:
self._register_event_handler("on_speech_started")
self._register_event_handler("on_utterance_end")
self._connection.on(LiveTranscriptionEvents.SpeechStarted, self._on_speech_started)
self._connection.on(LiveTranscriptionEvents.UtteranceEnd, self._on_utterance_end)
@property
def vad_enabled(self):
@@ -202,7 +202,25 @@ class DeepgramSTTService(STTService):
async def _connect(self):
logger.debug("Connecting to Deepgram")
if not await self._connection.start(self._settings):
self._connection: AsyncListenWebSocketClient = self._client.listen.asyncwebsocket.v("1")
self._connection.on(
LiveTranscriptionEvents(LiveTranscriptionEvents.Transcript), self._on_message
)
self._connection.on(LiveTranscriptionEvents(LiveTranscriptionEvents.Error), self._on_error)
if self.vad_enabled:
self._connection.on(
LiveTranscriptionEvents(LiveTranscriptionEvents.SpeechStarted),
self._on_speech_started,
)
self._connection.on(
LiveTranscriptionEvents(LiveTranscriptionEvents.UtteranceEnd),
self._on_utterance_end,
)
if not await self._connection.start(options=self._settings, addons=self._addons):
logger.error(f"{self}: unable to connect to Deepgram")
async def _disconnect(self):
@@ -214,6 +232,15 @@ class DeepgramSTTService(STTService):
await self.start_ttfb_metrics()
await self.start_processing_metrics()
async def _on_error(self, *args, **kwargs):
error: ErrorResponse = kwargs["error"]
logger.warning(f"{self} connection error, will retry: {error}")
await self.stop_all_metrics()
# NOTE(aleix): we don't disconnect (i.e. call finish on the connection)
# because this triggers more errors internally in the Deepgram SDK. So,
# we just forget about the previous connection and create a new one.
await self._connect()
async def _on_speech_started(self, *args, **kwargs):
await self.start_metrics()
await self._call_event_handler("on_speech_started", *args, **kwargs)

View File

@@ -191,7 +191,6 @@ class ElevenLabsTTSService(WordTTSService, WebsocketService):
aggregate_sentences=True,
push_text_frames=False,
push_stop_frames=True,
stop_frame_timeout_s=2.0,
pause_frame_processing=True,
sample_rate=sample_rate,
**kwargs,

View File

@@ -11,16 +11,13 @@ from loguru import logger
from pydantic import BaseModel
from pipecat.frames.frames import (
BotStoppedSpeakingFrame,
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
LLMFullResponseEndFrame,
StartFrame,
StartInterruptionFrame,
TTSAudioRawFrame,
TTSSpeakFrame,
TTSStartedFrame,
TTSStoppedFrame,
)
@@ -60,7 +57,12 @@ class FishAudioTTSService(TTSService, WebsocketService):
params: InputParams = InputParams(),
**kwargs,
):
super().__init__(pause_frame_processing=True, sample_rate=sample_rate, **kwargs)
super().__init__(
push_stop_frames=True,
pause_frame_processing=True,
sample_rate=sample_rate,
**kwargs,
)
self._api_key = api_key
self._base_url = "wss://api.fish.audio/v1/tts/live"

View File

@@ -266,7 +266,6 @@ class BaseOpenAILLMService(LLMService):
if tool_call.function and tool_call.function.name:
function_name += tool_call.function.name
tool_call_id = tool_call.id
await self.call_start_function(context, function_name)
if tool_call.function and tool_call.function.arguments:
# Keep iterating through the response to collect all the argument fragments
arguments += tool_call.function.arguments

View File

@@ -14,16 +14,13 @@ from loguru import logger
from pydantic import BaseModel
from pipecat.frames.frames import (
BotStoppedSpeakingFrame,
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
LLMFullResponseEndFrame,
StartFrame,
StartInterruptionFrame,
TTSAudioRawFrame,
TTSSpeakFrame,
TTSStartedFrame,
TTSStoppedFrame,
)
@@ -100,7 +97,6 @@ class RimeTTSService(AudioContextWordTTSService, WebsocketService):
aggregate_sentences=True,
push_text_frames=False,
push_stop_frames=True,
stop_frame_timeout_s=2.0,
pause_frame_processing=True,
sample_rate=sample_rate,
**kwargs,

View File

@@ -13,6 +13,7 @@ from loguru import logger
from websockets.protocol import State
from pipecat.frames.frames import ErrorFrame
from pipecat.utils.network import exponential_backoff_time
class WebsocketService(ABC):
@@ -51,27 +52,6 @@ class WebsocketService(ABC):
await self._connect_websocket()
return await self._verify_connection()
def _calculate_wait_time(
self, attempt: int, min_wait: float = 4, max_wait: float = 10, multiplier: float = 1
) -> float:
"""Calculate exponential backoff wait time.
Args:
attempt: Current attempt number (1-based)
min_wait: Minimum wait time in seconds
max_wait: Maximum wait time in seconds
multiplier: Base multiplier for exponential calculation
Returns:
Wait time in seconds
"""
try:
exp = 2 ** (attempt - 1) * multiplier
result = max(0, min(exp, max_wait))
return max(min_wait, result)
except (ValueError, ArithmeticError):
return max_wait
async def _receive_task_handler(self, report_error: Callable[[ErrorFrame], Awaitable[None]]):
"""Handles WebSocket message receiving with automatic retry logic.
@@ -104,7 +84,7 @@ class WebsocketService(ABC):
try:
if await self._reconnect_websocket(retry_count):
retry_count = 0 # Reset counter on successful reconnection
wait_time = self._calculate_wait_time(retry_count)
wait_time = exponential_backoff_time(retry_count)
await asyncio.sleep(wait_time)
except Exception as reconnect_error:
logger.error(f"{self} reconnection failed: {reconnect_error}")

View File

@@ -174,11 +174,9 @@ class BaseInputTransport(FrameProcessor):
async def _vad_analyze(self, audio_frame: InputAudioRawFrame) -> VADState:
state = VADState.QUIET
if self.vad_analyzer:
logger.trace(f"{self}: analyzing VAD on {audio_frame}")
state = await self.get_event_loop().run_in_executor(
self._executor, self.vad_analyzer.analyze_audio, audio_frame.audio
)
logger.trace(f"{self}: done analyzing VAD on {audio_frame}")
return state
async def _handle_vad(self, audio_frame: InputAudioRawFrame, vad_state: VADState):

View File

@@ -329,6 +329,10 @@ class DailyTransportClient(EventHandler):
def _speaker_name(self):
return f"speaker-{self}"
@property
def room_url(self) -> str:
return self._room_url
@property
def participant_id(self) -> str:
return self._participant_id
@@ -1112,6 +1116,10 @@ class DailyTransport(BaseTransport):
# DailyTransport
#
@property
def room_url(self) -> str:
return self._client.room_url
@property
def participant_id(self) -> str:
return self._client.participant_id

View File

@@ -0,0 +1,27 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
def exponential_backoff_time(
attempt: int, min_wait: float = 4, max_wait: float = 10, multiplier: float = 1
) -> float:
"""Calculate exponential backoff wait time.
Args:
attempt: Current attempt number (1-based)
min_wait: Minimum wait time in seconds
max_wait: Maximum wait time in seconds
multiplier: Base multiplier for exponential calculation
Returns:
Wait time in seconds
"""
try:
exp = 2 ** (attempt - 1) * multiplier
result = max(0, min(exp, max_wait))
return max(min_wait, result)
except (ValueError, ArithmeticError):
return max_wait

View File

@@ -11,12 +11,13 @@ from pipecat.frames.frames import (
BotStoppedSpeakingFrame,
FunctionCallInProgressFrame,
FunctionCallResultFrame,
InputAudioRawFrame,
STTMuteFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.processors.filters.stt_mute_filter import STTMuteConfig, STTMuteFilter, STTMuteStrategy
from pipecat.tests.utils import run_test
from pipecat.tests.utils import SleepFrame, run_test
class TestSTTMuteFilter(unittest.IsolatedAsyncioTestCase):
@@ -26,10 +27,14 @@ class TestSTTMuteFilter(unittest.IsolatedAsyncioTestCase):
frames_to_send = [
BotStartedSpeakingFrame(), # First bot speech starts
UserStartedSpeakingFrame(), # Should be suppressed
InputAudioRawFrame(
audio=b"", sample_rate=16000, num_channels=1
), # Should be suppressed
UserStoppedSpeakingFrame(), # Should be suppressed
BotStoppedSpeakingFrame(), # First bot speech ends
BotStartedSpeakingFrame(), # Second bot speech
UserStartedSpeakingFrame(), # Should pass through
InputAudioRawFrame(audio=b"", sample_rate=16000, num_channels=1), # Should pass through
UserStoppedSpeakingFrame(), # Should pass through
BotStoppedSpeakingFrame(),
]
@@ -41,6 +46,7 @@ class TestSTTMuteFilter(unittest.IsolatedAsyncioTestCase):
STTMuteFrame, # mute=False
BotStartedSpeakingFrame,
UserStartedSpeakingFrame, # Now passes through
InputAudioRawFrame, # Now passes through
UserStoppedSpeakingFrame, # Now passes through
BotStoppedSpeakingFrame,
]
@@ -57,12 +63,19 @@ class TestSTTMuteFilter(unittest.IsolatedAsyncioTestCase):
frames_to_send = [
BotStartedSpeakingFrame(), # First speech starts
UserStartedSpeakingFrame(), # Should be suppressed
InputAudioRawFrame(
audio=b"", sample_rate=16000, num_channels=1
), # Should be suppressed
UserStoppedSpeakingFrame(), # Should be suppressed
BotStoppedSpeakingFrame(), # First speech ends
UserStartedSpeakingFrame(), # Should pass through
InputAudioRawFrame(audio=b"", sample_rate=16000, num_channels=1), # Should pass through
UserStoppedSpeakingFrame(), # Should pass through
BotStartedSpeakingFrame(), # Second speech starts
UserStartedSpeakingFrame(), # Should be suppressed again
InputAudioRawFrame(
audio=b"", sample_rate=16000, num_channels=1
), # Should be suppressed again
UserStoppedSpeakingFrame(), # Should be suppressed again
BotStoppedSpeakingFrame(), # Second speech ends
]
@@ -73,6 +86,7 @@ class TestSTTMuteFilter(unittest.IsolatedAsyncioTestCase):
BotStoppedSpeakingFrame,
STTMuteFrame, # mute=False
UserStartedSpeakingFrame,
InputAudioRawFrame,
UserStoppedSpeakingFrame,
BotStartedSpeakingFrame,
STTMuteFrame, # mute=True
@@ -134,15 +148,23 @@ class TestSTTMuteFilter(unittest.IsolatedAsyncioTestCase):
frames_to_send = [
UserStartedSpeakingFrame(), # Should be suppressed (starts muted)
InputAudioRawFrame(
audio=b"", sample_rate=16000, num_channels=1
), # Should be suppressed
UserStoppedSpeakingFrame(), # Should be suppressed
BotStartedSpeakingFrame(), # First bot speech
UserStartedSpeakingFrame(), # Should be suppressed
InputAudioRawFrame(
audio=b"", sample_rate=16000, num_channels=1
), # Should be suppressed
UserStoppedSpeakingFrame(), # Should be suppressed
BotStoppedSpeakingFrame(), # First speech ends, unmutes
UserStartedSpeakingFrame(), # Should pass through
InputAudioRawFrame(audio=b"", sample_rate=16000, num_channels=1), # Should pass through
UserStoppedSpeakingFrame(), # Should pass through
BotStartedSpeakingFrame(), # Second speech
UserStartedSpeakingFrame(), # Should pass through
InputAudioRawFrame(audio=b"", sample_rate=16000, num_channels=1), # Should pass through
UserStoppedSpeakingFrame(), # Should pass through
BotStoppedSpeakingFrame(),
]
@@ -153,9 +175,11 @@ class TestSTTMuteFilter(unittest.IsolatedAsyncioTestCase):
BotStoppedSpeakingFrame,
STTMuteFrame, # mute=False after first speech
UserStartedSpeakingFrame,
InputAudioRawFrame,
UserStoppedSpeakingFrame,
BotStartedSpeakingFrame,
UserStartedSpeakingFrame,
InputAudioRawFrame,
UserStoppedSpeakingFrame,
BotStoppedSpeakingFrame,
]
@@ -190,23 +214,30 @@ class TestSTTMuteFilter(unittest.IsolatedAsyncioTestCase):
frames_to_send = [
UserStartedSpeakingFrame(), # Should pass through
InputAudioRawFrame(audio=b"", sample_rate=16000, num_channels=1), # Should pass through
UserStoppedSpeakingFrame(), # Should pass through
BotStartedSpeakingFrame(), # Bot starts speaking
UserStartedSpeakingFrame(), # Should be suppressed
InputAudioRawFrame(
audio=b"", sample_rate=16000, num_channels=1
), # Should be suppressed
UserStoppedSpeakingFrame(), # Should be suppressed
BotStoppedSpeakingFrame(), # Bot stops speaking
UserStartedSpeakingFrame(), # Should pass through
InputAudioRawFrame(audio=b"", sample_rate=16000, num_channels=1), # Should pass through
UserStoppedSpeakingFrame(), # Should pass through
]
expected_returned_frames = [
UserStartedSpeakingFrame,
InputAudioRawFrame,
UserStoppedSpeakingFrame,
BotStartedSpeakingFrame,
STTMuteFrame, # mute=True
BotStoppedSpeakingFrame,
STTMuteFrame, # mute=False
UserStartedSpeakingFrame,
InputAudioRawFrame,
UserStoppedSpeakingFrame,
]

View File

@@ -0,0 +1,34 @@
#
# Copyright (c) 2024-2025 Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import unittest
from pipecat.utils.network import exponential_backoff_time
class TestUtilsNetwork(unittest.IsolatedAsyncioTestCase):
async def test_exponential_backoff_time(self):
# min_wait=4, max_wait=10, multiplier=1
assert exponential_backoff_time(attempt=1, min_wait=4, max_wait=10, multiplier=1) == 4
assert exponential_backoff_time(attempt=2, min_wait=4, max_wait=10, multiplier=1) == 4
assert exponential_backoff_time(attempt=3, min_wait=4, max_wait=10, multiplier=1) == 4
assert exponential_backoff_time(attempt=4, min_wait=4, max_wait=10, multiplier=1) == 8
assert exponential_backoff_time(attempt=5, min_wait=4, max_wait=10, multiplier=1) == 10
assert exponential_backoff_time(attempt=6, min_wait=4, max_wait=10, multiplier=1) == 10
# min_wait=1, max_wait=10, multiplier=1
assert exponential_backoff_time(attempt=1, min_wait=1, max_wait=10, multiplier=1) == 1
assert exponential_backoff_time(attempt=2, min_wait=1, max_wait=10, multiplier=1) == 2
assert exponential_backoff_time(attempt=3, min_wait=1, max_wait=10, multiplier=1) == 4
assert exponential_backoff_time(attempt=4, min_wait=1, max_wait=10, multiplier=1) == 8
assert exponential_backoff_time(attempt=5, min_wait=1, max_wait=10, multiplier=1) == 10
assert exponential_backoff_time(attempt=6, min_wait=1, max_wait=10, multiplier=1) == 10
# min_wait=1, max_wait=20, multiplier=2
assert exponential_backoff_time(attempt=1, min_wait=1, max_wait=20, multiplier=2) == 2
assert exponential_backoff_time(attempt=2, min_wait=1, max_wait=20, multiplier=2) == 4
assert exponential_backoff_time(attempt=3, min_wait=1, max_wait=20, multiplier=2) == 8
assert exponential_backoff_time(attempt=4, min_wait=1, max_wait=20, multiplier=2) == 16
assert exponential_backoff_time(attempt=5, min_wait=1, max_wait=20, multiplier=2) == 20
assert exponential_backoff_time(attempt=6, min_wait=1, max_wait=20, multiplier=2) == 20