Compare commits
69 Commits
hush/nova3
...
mb/db-rime
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
713d20e4fc | ||
|
|
adc45bd282 | ||
|
|
fc544fa61c | ||
|
|
976fe95304 | ||
|
|
408270b647 | ||
|
|
1dfb75bc9d | ||
|
|
cefc2a1088 | ||
|
|
3b9b9200ea | ||
|
|
d6f29a0f4b | ||
|
|
5b762d11ef | ||
|
|
2f3e2da6b9 | ||
|
|
45058d4a94 | ||
|
|
5b637bd826 | ||
|
|
2d4fd7e903 | ||
|
|
b5662520aa | ||
|
|
af45c170b5 | ||
|
|
65f548b2ec | ||
|
|
b29ab8c608 | ||
|
|
d6dc37f0b6 | ||
|
|
12bce2e8c0 | ||
|
|
4acf7296e0 | ||
|
|
98706d429c | ||
|
|
41720b1a13 | ||
|
|
3ef4245166 | ||
|
|
3bb0797922 | ||
|
|
7c7b4c52af | ||
|
|
01f083b7fc | ||
|
|
91fcaebe25 | ||
|
|
9c5fe5c85e | ||
|
|
7e5e167a4b | ||
|
|
d04c4b36f3 | ||
|
|
a811e53626 | ||
|
|
df57202a05 | ||
|
|
69e6f3fdb7 | ||
|
|
6809254963 | ||
|
|
81093d3bed | ||
|
|
d9a67164f6 | ||
|
|
98259af54e | ||
|
|
039d144c79 | ||
|
|
d0f67fc189 | ||
|
|
6e3f96aa83 | ||
|
|
293677588d | ||
|
|
77e777b1ce | ||
|
|
7e7926059c | ||
|
|
c948754eff | ||
|
|
83f1a8830d | ||
|
|
80f8e05fcf | ||
|
|
afd1a1e80b | ||
|
|
84ac88cad7 | ||
|
|
211163e5c7 | ||
|
|
1b0bcebef6 | ||
|
|
89736b03c4 | ||
|
|
4edda718ed | ||
|
|
22a62edc9e | ||
|
|
50b6cc8135 | ||
|
|
45cf36925a | ||
|
|
83a71e1fec | ||
|
|
e809c8680e | ||
|
|
c926063d74 | ||
|
|
0334550356 | ||
|
|
90b9dce710 | ||
|
|
a5cdd5f1b8 | ||
|
|
5f937b8479 | ||
|
|
7e3e126730 | ||
|
|
75ca0571bb | ||
|
|
a48e5d0714 | ||
|
|
2b6a992207 | ||
|
|
24cf106ed2 | ||
|
|
95c8346cb5 |
15
.gitignore
vendored
15
.gitignore
vendored
@@ -32,6 +32,21 @@ fly.toml
|
||||
|
||||
# Example files
|
||||
pipecat/examples/twilio-chatbot/templates/streams.xml
|
||||
pipecat/examples/bot-ready-signalling/client/react-native/node_modules/
|
||||
pipecat/examples/bot-ready-signalling/client/react-native/.expo/
|
||||
pipecat/examples/bot-ready-signalling/client/react-native/dist/
|
||||
pipecat/examples/bot-ready-signalling/client/react-native/npm-debug.*
|
||||
pipecat/examples/bot-ready-signalling/client/react-native/*.jks
|
||||
pipecat/examples/bot-ready-signalling/client/react-native/*.p8
|
||||
pipecat/examples/bot-ready-signalling/client/react-native/*.p12
|
||||
pipecat/examples/bot-ready-signalling/client/react-native/*.key
|
||||
pipecat/examples/bot-ready-signalling/client/react-native/*.mobileprovision
|
||||
pipecat/examples/bot-ready-signalling/client/react-native/*.orig.*
|
||||
pipecat/examples/bot-ready-signalling/client/react-native/web-build/
|
||||
|
||||
# macOS
|
||||
.DS_Store
|
||||
|
||||
|
||||
# Documentation
|
||||
docs/api/_build/
|
||||
|
||||
76
CHANGELOG.md
76
CHANGELOG.md
@@ -5,6 +5,82 @@ All notable changes to **Pipecat** will be documented in this file.
|
||||
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
||||
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||
|
||||
## Unreleased
|
||||
|
||||
### Added
|
||||
|
||||
- Added new log observers `LLMLogObserver` and `TranscriptionLogObserver` that
|
||||
can be useful for debugging your pipelines.
|
||||
|
||||
- Added `room_url` property to `DailyTransport`.
|
||||
|
||||
- Added `addons` argument to `DeepgramSTTService`.
|
||||
|
||||
- Added `exponential_backoff_time()` to `utils.network` module.
|
||||
|
||||
### Changed
|
||||
|
||||
- `AnthropicLLMService` now uses `claude-3-7-sonnet-20250219` as the default
|
||||
model.
|
||||
|
||||
- `RimeHttpTTSService` needs an `aiohttp.ClientSession` to be passed to the
|
||||
constructor as all the other HTTP-based services.
|
||||
|
||||
- `RimeHttpTTSService` doesn't use a default voice anymore.
|
||||
|
||||
- `DeepgramSTTService` now uses the new `nova-3` model by default. If you want
|
||||
to use the previous model you can pass `LiveOptions(model="nova-2-general")`.
|
||||
(see https://deepgram.com/learn/introducing-nova-3-speech-to-text-api)
|
||||
|
||||
```python
|
||||
stt = DeepgramSTTService(..., live_options=LiveOptions(model="nova-2-general"))
|
||||
```
|
||||
|
||||
### Removed
|
||||
|
||||
- Remove `TransportParams.audio_out_is_live` since it was not being used at all.
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fixed a `ElevenLabsTTSService`, `FishAudioTTSService`, `LMNTTTSService` and
|
||||
`PlayHTTTSService` issue that was resulting in audio requested before an
|
||||
interruption being played after an interruption.
|
||||
|
||||
- Fixed `match_endofsentence` support for ellipses.
|
||||
|
||||
- Fixed an issue that would cause undesired interruptions via
|
||||
`EmulateUserStartedSpeakingFrame` when only interim transcriptions (i.e. no
|
||||
final transcriptions) where received.
|
||||
|
||||
- Fixed an issue where `EndTaskFrame` was not triggering
|
||||
`on_client_disconnected` or closing the WebSocket in FastAPI.
|
||||
|
||||
- Fixed an issue in `DeepgramSTTService` where the `sample_rate` passed to the
|
||||
`LiveOptions` was not being used, causing the service to use the default
|
||||
sample rate of pipeline.
|
||||
|
||||
- Fixed a context aggregator issue that would not append the LLM text response
|
||||
to the context if a function call happened in the same LLM turn.
|
||||
|
||||
- Fixed an issue that was causing HTTP TTS services to push `TTSStoppedFrame`
|
||||
more than once.
|
||||
|
||||
- Fixed a `FishAudioTTSService` issue where `TTSStoppedFrame` was not being
|
||||
pushed.
|
||||
|
||||
- Fixed an issue that `start_callback` was not invoked for some LLM services.
|
||||
|
||||
- Fixed an issue that would cause `DeepgramSTTService` to stop working after an
|
||||
error occurred (e.g. sudden network loss). If the network recovered we would
|
||||
not reconnect.
|
||||
|
||||
- Fixed a `STTMuteFilter` issue that would not mute user audio frames causing
|
||||
transcriptions to be generated by the STT service.
|
||||
|
||||
### Other
|
||||
|
||||
- Added Gemini support to `examples/phone-chatbot`.
|
||||
|
||||
## [0.0.57] - 2025-02-14
|
||||
|
||||
### Added
|
||||
|
||||
@@ -18,6 +18,9 @@ AZURE_DALLE_API_KEY=...
|
||||
AZURE_DALLE_ENDPOINT=https://...
|
||||
AZURE_DALLE_MODEL=...
|
||||
|
||||
# Cartesia
|
||||
CARTESIA_API_KEY=...
|
||||
|
||||
# Daily
|
||||
DAILY_API_KEY=...
|
||||
DAILY_SAMPLE_ROOM_URL=https://...
|
||||
|
||||
1
examples/bot-ready-signalling/client/react-native/.nvmrc
Normal file
1
examples/bot-ready-signalling/client/react-native/.nvmrc
Normal file
@@ -0,0 +1 @@
|
||||
22.14
|
||||
60
examples/bot-ready-signalling/client/react-native/README.md
Normal file
60
examples/bot-ready-signalling/client/react-native/README.md
Normal file
@@ -0,0 +1,60 @@
|
||||
# React Native Implementation
|
||||
|
||||
Basic implementation using the [Pipecat React Native SDK](https://docs.pipecat.ai/client/react-native/introduction).
|
||||
|
||||
## Usage
|
||||
|
||||
### Expo requirements
|
||||
|
||||
This project cannot be used with an [Expo Go](https://docs.expo.dev/workflow/expo-go/) app because [it requires custom native code](https://docs.expo.io/workflow/customizing/).
|
||||
|
||||
When a project requires custom native code or a config plugin, we need to transition from using [Expo Go](https://docs.expo.dev/workflow/expo-go/)
|
||||
to a [development build](https://docs.expo.dev/development/introduction/).
|
||||
|
||||
More details about the custom native code used by this demo can be found in [rn-daily-js-expo-config-plugin](https://github.com/daily-co/rn-daily-js-expo-config-plugin).
|
||||
|
||||
### Building remotely
|
||||
|
||||
If you do not have experience with Xcode and Android Studio builds or do not have them installed locally on your computer, you will need to follow [this guide from Expo to use EAS Build](https://docs.expo.dev/development/create-development-builds/#create-and-install-eas-build).
|
||||
|
||||
### Building locally
|
||||
|
||||
You will need to have installed locally on your computer:
|
||||
- [Xcode](https://developer.apple.com/xcode/) to build for iOS;
|
||||
- [Android Studio](https://developer.android.com/studio) to build for Android;
|
||||
|
||||
#### Install the demo dependencies
|
||||
|
||||
```bash
|
||||
# Use the version of node specified in .nvmrc
|
||||
nvm i
|
||||
|
||||
# Install dependencies
|
||||
npm i
|
||||
|
||||
# Before a native app can be compiled, the native source code must be generated.
|
||||
npx expo prebuild
|
||||
|
||||
# Configure the environment variable to connect to the local server
|
||||
cp env.example .env
|
||||
# edit .env and add your local ip address, for example: http://192.168.1.16:7860
|
||||
```
|
||||
|
||||
#### Running on Android
|
||||
|
||||
After plugging in an Android device [configured for debugging](https://developer.android.com/studio/debug/dev-options), run the following command:
|
||||
|
||||
```
|
||||
npm run android
|
||||
```
|
||||
|
||||
#### Running on iOS
|
||||
|
||||
Run the following command:
|
||||
|
||||
```
|
||||
npm run ios
|
||||
```
|
||||
|
||||
#### Connect to the server
|
||||
Use the http://localhost:5173 in your app.
|
||||
75
examples/bot-ready-signalling/client/react-native/app.json
Normal file
75
examples/bot-ready-signalling/client/react-native/app.json
Normal file
@@ -0,0 +1,75 @@
|
||||
{
|
||||
"expo": {
|
||||
"name": "bot-ready-rn",
|
||||
"slug": "bot-ready-rn",
|
||||
"version": "1.0.0",
|
||||
"orientation": "portrait",
|
||||
"icon": "./assets/icon.png",
|
||||
"userInterfaceStyle": "light",
|
||||
"splash": {
|
||||
"image": "./assets/splash.png",
|
||||
"resizeMode": "contain",
|
||||
"backgroundColor": "#ffffff"
|
||||
},
|
||||
"updates": {
|
||||
"fallbackToCacheTimeout": 0
|
||||
},
|
||||
"assetBundlePatterns": [
|
||||
"**/*"
|
||||
],
|
||||
"ios": {
|
||||
"supportsTablet": true,
|
||||
"bitcode": false,
|
||||
"bundleIdentifier": "co.daily.expo.BotReady",
|
||||
"infoPlist": {
|
||||
"UIBackgroundModes": [
|
||||
"voip"
|
||||
]
|
||||
},
|
||||
"appleTeamId": "EEBGKV9N3N"
|
||||
},
|
||||
"android": {
|
||||
"adaptiveIcon": {
|
||||
"foregroundImage": "./assets/adaptive-icon.png",
|
||||
"backgroundColor": "#FFFFFF"
|
||||
},
|
||||
"package": "co.daily.expo.BotReady",
|
||||
"permissions": [
|
||||
"android.permission.ACCESS_NETWORK_STATE",
|
||||
"android.permission.BLUETOOTH",
|
||||
"android.permission.CAMERA",
|
||||
"android.permission.INTERNET",
|
||||
"android.permission.MODIFY_AUDIO_SETTINGS",
|
||||
"android.permission.RECORD_AUDIO",
|
||||
"android.permission.SYSTEM_ALERT_WINDOW",
|
||||
"android.permission.WAKE_LOCK",
|
||||
"android.permission.FOREGROUND_SERVICE",
|
||||
"android.permission.FOREGROUND_SERVICE_CAMERA",
|
||||
"android.permission.FOREGROUND_SERVICE_MICROPHONE",
|
||||
"android.permission.FOREGROUND_SERVICE_MEDIA_PROJECTION",
|
||||
"android.permission.POST_NOTIFICATIONS"
|
||||
]
|
||||
},
|
||||
"web": {
|
||||
"favicon": "./assets/favicon.png"
|
||||
},
|
||||
"plugins": [
|
||||
"@config-plugins/react-native-webrtc",
|
||||
"@daily-co/config-plugin-rn-daily-js",
|
||||
[
|
||||
"expo-build-properties",
|
||||
{
|
||||
"android": {
|
||||
"minSdkVersion": 24,
|
||||
"compileSdkVersion": 35,
|
||||
"targetSdkVersion": 34,
|
||||
"buildToolsVersion": "35.0.0"
|
||||
},
|
||||
"ios": {
|
||||
"deploymentTarget": "15.1"
|
||||
}
|
||||
}
|
||||
]
|
||||
]
|
||||
}
|
||||
}
|
||||
Binary file not shown.
|
After Width: | Height: | Size: 17 KiB |
Binary file not shown.
|
After Width: | Height: | Size: 1.4 KiB |
Binary file not shown.
|
After Width: | Height: | Size: 22 KiB |
Binary file not shown.
|
After Width: | Height: | Size: 46 KiB |
@@ -0,0 +1,7 @@
|
||||
module.exports = function(api) {
|
||||
api.cache(true);
|
||||
return {
|
||||
presets: ['babel-preset-expo'],
|
||||
plugins: [["module:react-native-dotenv"]],
|
||||
};
|
||||
};
|
||||
@@ -0,0 +1 @@
|
||||
API_BASE_URL=http://YOUR_LOCAL_IP:7860
|
||||
7
examples/bot-ready-signalling/client/react-native/index.js
vendored
Normal file
7
examples/bot-ready-signalling/client/react-native/index.js
vendored
Normal file
@@ -0,0 +1,7 @@
|
||||
import { registerRootComponent } from "expo";
|
||||
|
||||
import App from "./src/App";
|
||||
|
||||
// registerRootComponent calls AppRegistry.registerComponent('main', () => App);
|
||||
// It also ensures that the environment is set up appropriately
|
||||
registerRootComponent(App);
|
||||
@@ -0,0 +1,4 @@
|
||||
// Learn more https://docs.expo.io/guides/customizing-metro
|
||||
const { getDefaultConfig } = require('expo/metro-config');
|
||||
|
||||
module.exports = getDefaultConfig(__dirname);
|
||||
10983
examples/bot-ready-signalling/client/react-native/package-lock.json
generated
Normal file
10983
examples/bot-ready-signalling/client/react-native/package-lock.json
generated
Normal file
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,31 @@
|
||||
{
|
||||
"name": "bot-ready-rn",
|
||||
"version": "1.0.0",
|
||||
"scripts": {
|
||||
"start": "expo start --dev-client",
|
||||
"android": "expo run:android --device",
|
||||
"ios": "expo run:ios --device",
|
||||
"web": "expo start --web"
|
||||
},
|
||||
"dependencies": {
|
||||
"@config-plugins/react-native-webrtc": "^10.0.0",
|
||||
"@daily-co/config-plugin-rn-daily-js": "0.0.7",
|
||||
"@daily-co/react-native-daily-js": "^0.70.0",
|
||||
"@daily-co/react-native-webrtc": "^118.0.3-daily.2",
|
||||
"@react-native-async-storage/async-storage": "1.23.1",
|
||||
"expo": "^52.0.0",
|
||||
"expo-build-properties": "~0.13.1",
|
||||
"expo-dev-client": "~5.0.5",
|
||||
"expo-splash-screen": "~0.29.16",
|
||||
"expo-status-bar": "~2.0.0",
|
||||
"react": "18.3.1",
|
||||
"react-native": "0.76.3",
|
||||
"react-native-background-timer": "^2.4.1",
|
||||
"react-native-dotenv": "^3.4.11",
|
||||
"react-native-get-random-values": "^1.11.0"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@babel/core": "^7.12.9"
|
||||
},
|
||||
"private": true
|
||||
}
|
||||
121
examples/bot-ready-signalling/client/react-native/src/App.js
vendored
Normal file
121
examples/bot-ready-signalling/client/react-native/src/App.js
vendored
Normal file
@@ -0,0 +1,121 @@
|
||||
import React, { useState, useEffect } from 'react';
|
||||
import {SafeAreaView, View, Text, Button, StyleSheet, ScrollView} from 'react-native';
|
||||
import Daily from "@daily-co/react-native-daily-js";
|
||||
import { API_BASE_URL } from "@env";
|
||||
|
||||
const CallScreen = () => {
|
||||
const [connectionStatus, setConnectionStatus] = useState('Disconnected');
|
||||
const [isConnected, setIsConnected] = useState(false);
|
||||
const [callObject, setCallObject] = useState(null);
|
||||
const [logs, setLogs] = useState([]);
|
||||
|
||||
useEffect(() => {
|
||||
if (callObject) {
|
||||
setupTrackListeners(callObject);
|
||||
}
|
||||
}, [callObject]);
|
||||
|
||||
const log = (message) => {
|
||||
setLogs((prevLogs) => [...prevLogs, `${new Date().toISOString()} - ${message}`]);
|
||||
console.log(message);
|
||||
};
|
||||
|
||||
const setupTrackListeners = (callObject) => {
|
||||
callObject.on("joined-meeting", () => {
|
||||
setConnectionStatus('Connected');
|
||||
setIsConnected(true);
|
||||
log('Client connected');
|
||||
});
|
||||
callObject.on("left-meeting", () => {
|
||||
setConnectionStatus('Disconnected');
|
||||
setIsConnected(false);
|
||||
log('Client disconnected');
|
||||
});
|
||||
callObject.on("participant-left", () => {
|
||||
// When the bot leaves, we are also disconnecting from the call
|
||||
disconnect().catch((err) => {
|
||||
log(`Failed to disconnect ${err}`);
|
||||
})
|
||||
});
|
||||
// Trigger so the bot can start sending audio
|
||||
callObject.on("track-started", (evt) => {
|
||||
if (evt.track.kind === "audio" && evt.participant.local === false) {
|
||||
handleEventToConsole(evt)
|
||||
log("Sending the message that will trigger the bot to play the audio.")
|
||||
callObject.sendAppMessage("playable")
|
||||
}
|
||||
});
|
||||
callObject.on("error", (evt) => log(`Error: ${evt.error}`));
|
||||
// Other events just for awareness
|
||||
callObject.on("track-stopped", handleEventToConsole);
|
||||
callObject.on("participant-joined", handleEventToConsole);
|
||||
callObject.on("participant-updated", handleEventToConsole);
|
||||
};
|
||||
|
||||
const handleEventToConsole = (evt) => {
|
||||
log(`Received event: ${evt.action}`);
|
||||
};
|
||||
|
||||
const connect = async () => {
|
||||
try {
|
||||
const callObject = Daily.createCallObject({ subscribeToTracksAutomatically: true });
|
||||
setCallObject(callObject);
|
||||
const connectionUrl = `${API_BASE_URL}/connect`
|
||||
const res = await fetch(connectionUrl, { method: "POST", headers: { "Content-Type": "application/json" } });
|
||||
const roomInfo = await res.json();
|
||||
await callObject.join({ url: roomInfo.room_url });
|
||||
} catch (error) {
|
||||
log(`Error connecting: ${error.message}`);
|
||||
}
|
||||
};
|
||||
|
||||
const disconnect = async () => {
|
||||
if (callObject) {
|
||||
try {
|
||||
await callObject.leave();
|
||||
await callObject.destroy();
|
||||
setCallObject(null);
|
||||
} catch (error) {
|
||||
log(`Error disconnecting: ${error.message}`);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
return (
|
||||
<SafeAreaView style={styles.safeArea}>
|
||||
<View style={styles.container}>
|
||||
<View style={styles.statusBar}>
|
||||
<Text>Status: <Text style={styles.status}>{connectionStatus}</Text></Text>
|
||||
<View style={styles.controls}>
|
||||
<Button
|
||||
title={isConnected ? "Disconnect" : "Connect"}
|
||||
onPress={isConnected ? disconnect : connect}
|
||||
/>
|
||||
</View>
|
||||
</View>
|
||||
|
||||
<View style={styles.debugPanel}>
|
||||
<Text style={styles.debugTitle}>Debug Info</Text>
|
||||
<ScrollView style={styles.debugLog}>
|
||||
{logs.map((logEntry, index) => (
|
||||
<Text key={index} style={styles.logText}>{logEntry}</Text>
|
||||
))}
|
||||
</ScrollView>
|
||||
</View>
|
||||
</View>
|
||||
</SafeAreaView>
|
||||
);
|
||||
};
|
||||
|
||||
const styles = StyleSheet.create({
|
||||
safeArea: { flex: 1, backgroundColor: '#f0f0f0', padding: 20 },
|
||||
container: { flex: 1, margin: 20 },
|
||||
statusBar: { flexDirection: 'row', justifyContent: 'space-between', alignItems: 'center', padding: 10, backgroundColor: '#fff', borderRadius: 8, marginBottom: 20 },
|
||||
status: { fontWeight: 'bold' },
|
||||
controls: { flexDirection: 'row', gap: 10 },
|
||||
debugPanel: { height: '80%', backgroundColor: '#fff', borderRadius: 8, padding: 20},
|
||||
debugTitle: { fontSize: 16, fontWeight: 'bold' },
|
||||
debugLog: { height: '100%', overflow: 'scroll', backgroundColor: '#f8f8f8', padding: 10, borderRadius: 4, fontFamily: 'monospace', fontSize: 12, lineHeight: 1.4 },
|
||||
});
|
||||
|
||||
export default CallScreen;
|
||||
@@ -14,6 +14,7 @@ from loguru import logger
|
||||
from runner import configure
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import TTSSpeakFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
@@ -30,6 +31,12 @@ logger.add(sys.stderr, level="DEBUG")
|
||||
video_participant_id = None
|
||||
|
||||
|
||||
async def start_fetch_weather(function_name, llm, context):
|
||||
"""Push a frame to the LLM; this is handy when the LLM response might take a while."""
|
||||
await llm.push_frame(TTSSpeakFrame("Let me check on that."))
|
||||
logger.debug(f"Starting fetch_weather_from_api with function_name: {function_name}")
|
||||
|
||||
|
||||
async def get_weather(function_name, tool_call_id, arguments, llm, context, result_callback):
|
||||
location = arguments["location"]
|
||||
await result_callback(f"The weather in {location} is currently 72 degrees and sunny.")
|
||||
@@ -63,7 +70,7 @@ async def main():
|
||||
)
|
||||
|
||||
llm = GoogleLLMService(api_key=os.getenv("GOOGLE_API_KEY"), model="gemini-2.0-flash-001")
|
||||
llm.register_function("get_weather", get_weather)
|
||||
llm.register_function("get_weather", get_weather, start_fetch_weather)
|
||||
llm.register_function("get_image", get_image)
|
||||
|
||||
tools = [
|
||||
|
||||
@@ -38,7 +38,6 @@ async def main():
|
||||
"GStreamer",
|
||||
DailyParams(
|
||||
audio_out_enabled=True,
|
||||
audio_out_is_live=True,
|
||||
camera_out_enabled=True,
|
||||
camera_out_width=1280,
|
||||
camera_out_height=720,
|
||||
|
||||
@@ -18,12 +18,10 @@ from pipecat.frames.frames import (
|
||||
BotStartedSpeakingFrame,
|
||||
BotStoppedSpeakingFrame,
|
||||
Frame,
|
||||
LLMFullResponseEndFrame,
|
||||
LLMFullResponseStartFrame,
|
||||
LLMTextFrame,
|
||||
StartInterruptionFrame,
|
||||
)
|
||||
from pipecat.observers.base_observer import BaseObserver
|
||||
from pipecat.observers.loggers.llm_log_observer import LLMLogObserver
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
@@ -73,38 +71,6 @@ class DebugObserver(BaseObserver):
|
||||
logger.info(f"🤖 BOT STOP SPEAKING: {src} {arrow} {dst} at {time_sec:.2f}s")
|
||||
|
||||
|
||||
class LLMLogObserver(BaseObserver):
|
||||
"""Observer to log LLM activity to the console.
|
||||
|
||||
Logs all frame instances of:
|
||||
- LLMFullResponseStartFrame (only from LLM service)
|
||||
- LLMTextFrame
|
||||
- LLMFullResponseEndFrame (only from LLM service)
|
||||
|
||||
This allows you to track when the LLM starts responding, what it generates, and when it finishes.
|
||||
Log format: [LLM EVENT]: [details] at [timestamp]s
|
||||
"""
|
||||
|
||||
async def on_push_frame(
|
||||
self,
|
||||
src: FrameProcessor,
|
||||
dst: FrameProcessor,
|
||||
frame: Frame,
|
||||
direction: FrameDirection,
|
||||
timestamp: int,
|
||||
):
|
||||
time_sec = timestamp / 1_000_000_000
|
||||
|
||||
# Only log start/end frames from OpenAILLMService
|
||||
if isinstance(frame, (LLMFullResponseStartFrame, LLMFullResponseEndFrame)):
|
||||
if isinstance(src, OpenAILLMService):
|
||||
event = "START" if isinstance(frame, LLMFullResponseStartFrame) else "END"
|
||||
logger.info(f"🧠 LLM {event} RESPONSE at {time_sec:.2f}s")
|
||||
# Log all LLMTextFrames
|
||||
elif isinstance(frame, LLMTextFrame):
|
||||
logger.info(f"🧠 LLM GENERATING: {frame.text!r} at {time_sec:.2f}s")
|
||||
|
||||
|
||||
async def main():
|
||||
async with aiohttp.ClientSession() as session:
|
||||
(room_url, token) = await configure(session)
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
<!-- @format -->
|
||||
|
||||
<div align="center">
|
||||
<img alt="pipecat" width="300px" height="auto" src="image.png">
|
||||
</div>
|
||||
@@ -104,6 +106,21 @@ curl -X POST "http://localhost:7860/daily_start_bot" \
|
||||
-d '{"dialoutNumber": "+18057145330", "detectVoicemail": true}'
|
||||
```
|
||||
|
||||
### New! Using Gemini with Daily
|
||||
|
||||
We have introduced a new example file that uses Gemini. You can find the code within bot_daily_gemini.py.
|
||||
If you want to spin up a Gemini-based bot for this demo, instead of an OpenAI-based bot, call the same properties above but on the `daily_gemini_start_bot` endpoint instead.
|
||||
|
||||
For example:
|
||||
|
||||
```shell
|
||||
curl -X POST "http://localhost:7860/daily_gemini_start_bot" \ py pipecat
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{"detectVoicemail": true}'
|
||||
```
|
||||
|
||||
Any request body properties supported by `/daily_start_bot` (such as "detectVoicemail", "dialoutnumber", etc) can also be passed to `/daily_gemini_start_bot`. The only difference is that calling the Gemini endpoint will start a Gemini bot session.
|
||||
|
||||
### More information
|
||||
|
||||
For more configuration options, please consult [Daily's API documentation](https://docs.daily.co).
|
||||
|
||||
@@ -98,6 +98,7 @@ async def main(
|
||||
- **"Record your message after the tone."**
|
||||
- **Any phrase that suggests an answering machine or voicemail.**
|
||||
- **ASSUME IT IS A VOICEMAIL. DO NOT WAIT FOR MORE CONFIRMATION.**
|
||||
- **IF THE CALL SAYS "PLEASE LEAVE A MESSAGE AFTER THE BEEP", WAIT FOR THE BEEP BEFORE LEAVING A MESSAGE.**
|
||||
|
||||
#### **Step 2: Leave a Voicemail Message**
|
||||
- Immediately say:
|
||||
@@ -110,7 +111,9 @@ async def main(
|
||||
- If the call is answered by a human, say:
|
||||
*"Oh, hello! I'm a friendly chatbot. Is there anything I can help you with?"*
|
||||
- Keep responses **brief and helpful**.
|
||||
- If the user no longer needs assistance, **call `terminate_call` immediately.**
|
||||
- If the user no longer needs assistance, say:
|
||||
*"Okay, thank you! Have a great day!"*
|
||||
-**Then call `terminate_call` immediately.**
|
||||
|
||||
---
|
||||
|
||||
|
||||
234
examples/phone-chatbot/bot_daily_gemini.py
Normal file
234
examples/phone-chatbot/bot_daily_gemini.py
Normal file
@@ -0,0 +1,234 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
import argparse
|
||||
import asyncio
|
||||
import os
|
||||
import sys
|
||||
from typing import Optional
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import EndTaskFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.services.ai_services import LLMService
|
||||
from pipecat.services.elevenlabs import ElevenLabsTTSService
|
||||
from pipecat.services.google import GoogleLLMContext, GoogleLLMService
|
||||
from pipecat.transports.services.daily import DailyDialinSettings, DailyParams, DailyTransport
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
logger.remove(0)
|
||||
logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
|
||||
daily_api_key = os.getenv("DAILY_API_KEY", "")
|
||||
daily_api_url = os.getenv("DAILY_API_URL", "https://api.daily.co/v1")
|
||||
|
||||
|
||||
async def terminate_call(
|
||||
function_name, tool_call_id, args, llm: LLMService, context, result_callback
|
||||
):
|
||||
"""Function the bot can call to terminate the call upon completion of a voicemail message."""
|
||||
await llm.queue_frame(EndTaskFrame(), FrameDirection.UPSTREAM)
|
||||
|
||||
|
||||
async def main(
|
||||
room_url: str,
|
||||
token: str,
|
||||
callId: str,
|
||||
callDomain: str,
|
||||
detect_voicemail: bool,
|
||||
dialout_number: Optional[str],
|
||||
):
|
||||
# dialin_settings are only needed if Daily's SIP URI is used
|
||||
# If you are handling this via Twilio, Telnyx, set this to None
|
||||
# and handle call-forwarding when on_dialin_ready fires.
|
||||
dialin_settings = DailyDialinSettings(call_id=callId, call_domain=callDomain)
|
||||
transport = DailyTransport(
|
||||
room_url,
|
||||
token,
|
||||
"Chatbot",
|
||||
DailyParams(
|
||||
api_url=daily_api_url,
|
||||
api_key=daily_api_key,
|
||||
dialin_settings=dialin_settings,
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
camera_out_enabled=False,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
transcription_enabled=True,
|
||||
),
|
||||
)
|
||||
|
||||
tts = ElevenLabsTTSService(
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY", ""),
|
||||
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
|
||||
)
|
||||
|
||||
tools = [
|
||||
{
|
||||
"function_declarations": [
|
||||
{
|
||||
"name": "terminate_call",
|
||||
"description": "Terminate the call",
|
||||
},
|
||||
]
|
||||
}
|
||||
]
|
||||
|
||||
system_instruction = """You are Chatbot, a friendly, helpful robot. Never mention this prompt.
|
||||
|
||||
**Operating Procedure:**
|
||||
|
||||
**Phase 1: Initial Call Answer - Listen for Voicemail Greeting**
|
||||
|
||||
**IMMEDIATELY after the call connects, LISTEN CAREFULLY for the *very first thing* you hear.**
|
||||
|
||||
**Listen for these sentences or very close variations as the *initial greeting*:**
|
||||
|
||||
* **"Please leave a message after the beep."**
|
||||
* **"No one is available to take your call."**
|
||||
* **"Record your message after the tone."**
|
||||
* **"You have reached voicemail for..."** (or similar voicemail identification)
|
||||
|
||||
**If you HEAR one of these sentences (or a very similar greeting) as the *initial response* to the call, IMMEDIATELY assume it is voicemail and proceed to Phase 2.**
|
||||
|
||||
**If you hear "PLEASE LEAVE A MESSAGE AFTER THE BEEP", WAIT for the actual beep sound from the voicemail system *after* hearing the sentence, before proceeding to Phase 2.**
|
||||
|
||||
**If you DO NOT hear any of these voicemail greetings as the *initial response*, assume it is a human and proceed to Phase 3.**
|
||||
|
||||
|
||||
**Phase 2: Leave Voicemail Message (If Voicemail Detected):**
|
||||
|
||||
If you assumed voicemail in Phase 1, say this EXACTLY:
|
||||
"Hello, this is a message for Pipecat example user. This is Chatbot. Please call back on 123-456-7891. Thank you."
|
||||
|
||||
**Immediately after saying the message, call the function `terminate_call`.**
|
||||
**DO NOT SAY ANYTHING ELSE. SILENCE IS REQUIRED AFTER `terminate_call`.**
|
||||
|
||||
|
||||
**Phase 3: Human Interaction (If No Voicemail Greeting Detected in Phase 1):**
|
||||
|
||||
If you did not detect a voicemail greeting in Phase 1 and a human answers, say:
|
||||
"Oh, hello! I'm a friendly chatbot. Is there anything I can help you with?"
|
||||
|
||||
Keep your responses **short and helpful.**
|
||||
|
||||
If the human is finished, say:
|
||||
"Okay, thank you! Have a great day!"
|
||||
|
||||
**Then, immediately call the function `terminate_call`.**
|
||||
|
||||
|
||||
**VERY IMPORTANT RULES - DO NOT DO THESE THINGS:**
|
||||
|
||||
* **DO NOT SAY "Please leave a message after the beep."**
|
||||
* **DO NOT SAY "No one is available to take your call."**
|
||||
* **DO NOT SAY "Record your message after the tone."**
|
||||
* **DO NOT SAY ANY voicemail greeting yourself.**
|
||||
* **Only check for voicemail greetings in Phase 1, *immediately after the call connects*.**
|
||||
* **After voicemail or human interaction, ALWAYS call `terminate_call` immediately.**
|
||||
* **Do not speak after calling `terminate_call`.**
|
||||
* Your speech will be audio, so use simple language without special characters.
|
||||
"""
|
||||
|
||||
llm = GoogleLLMService(
|
||||
model="models/gemini-2.0-flash-exp",
|
||||
api_key=os.getenv("GOOGLE_API_KEY"),
|
||||
system_instruction=system_instruction,
|
||||
tools=tools,
|
||||
)
|
||||
llm.register_function("terminate_call", terminate_call)
|
||||
|
||||
context = GoogleLLMContext()
|
||||
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
context_aggregator.user(), # User responses
|
||||
llm, # LLM
|
||||
tts, # TTS
|
||||
transport.output(), # Transport bot output
|
||||
context_aggregator.assistant(), # Assistant spoken responses
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
PipelineParams(allow_interruptions=True),
|
||||
)
|
||||
|
||||
if dialout_number:
|
||||
logger.debug("dialout number detected; doing dialout")
|
||||
|
||||
# Configure some handlers for dialing out
|
||||
@transport.event_handler("on_joined")
|
||||
async def on_joined(transport, data):
|
||||
logger.debug(f"Joined; starting dialout to: {dialout_number}")
|
||||
await transport.start_dialout({"phoneNumber": dialout_number})
|
||||
|
||||
@transport.event_handler("on_dialout_connected")
|
||||
async def on_dialout_connected(transport, data):
|
||||
logger.debug(f"Dial-out connected: {data}")
|
||||
|
||||
@transport.event_handler("on_dialout_answered")
|
||||
async def on_dialout_answered(transport, data):
|
||||
logger.debug(f"Dial-out answered: {data}")
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# unlike the dialin case, for the dialout case, the caller will speak first. Presumably
|
||||
# they will answer the phone and say "Hello?" Since we've captured their transcript,
|
||||
# That will put a frame into the pipeline and prompt an LLM completion, which is how the
|
||||
# bot will then greet the user.
|
||||
elif detect_voicemail:
|
||||
logger.debug("Detect voicemail example. You can test this in example in Daily Prebuilt")
|
||||
|
||||
# For the voicemail detection case, we do not want the bot to answer the phone. We want it to wait for the voicemail
|
||||
# machine to say something like 'Leave a message after the beep', or for the user to say 'Hello?'.
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
else:
|
||||
logger.debug("no dialout number; assuming dialin")
|
||||
|
||||
# Different handlers for dialin
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# For the dialin case, we want the bot to answer the phone and greet the user. We
|
||||
# can prompt the bot to speak by putting the context into the pipeline.
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
|
||||
@transport.event_handler("on_participant_left")
|
||||
async def on_participant_left(transport, participant, reason):
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner()
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="Pipecat Simple ChatBot")
|
||||
parser.add_argument("-u", type=str, help="Room URL")
|
||||
parser.add_argument("-t", type=str, help="Token")
|
||||
parser.add_argument("-i", type=str, help="Call ID")
|
||||
parser.add_argument("-d", type=str, help="Call Domain")
|
||||
parser.add_argument("-v", action="store_true", help="Detect voicemail")
|
||||
parser.add_argument("-o", type=str, help="Dialout number", default=None)
|
||||
config = parser.parse_args()
|
||||
|
||||
asyncio.run(main(config.u, config.t, config.i, config.d, config.v, config.o))
|
||||
@@ -110,10 +110,15 @@ async def _create_daily_room(
|
||||
|
||||
# Spawn a new agent, and join the user session
|
||||
# Note: this is mostly for demonstration purposes (refer to 'deployment' in docs)
|
||||
print(f"Vendor: {vendor}")
|
||||
if vendor == "daily":
|
||||
bot_proc = f"python3 -m bot_daily -u {room.url} -t {token} -i {callId} -d {callDomain}{' -v' if detect_voicemail else ''}"
|
||||
if dialoutNumber:
|
||||
bot_proc += f" -o {dialoutNumber}"
|
||||
elif vendor == "daily-gemini":
|
||||
bot_proc = f"python3 -m bot_daily_gemini -u {room.url} -t {token} -i {callId} -d {callDomain}{' -v' if detect_voicemail else ''}"
|
||||
if dialoutNumber:
|
||||
bot_proc += f" -o {dialoutNumber}"
|
||||
else:
|
||||
bot_proc = f"python3 -m bot_twilio -u {room.url} -t {token} -i {callId} -s {room.config.sip_endpoint}"
|
||||
|
||||
@@ -201,6 +206,38 @@ async def daily_start_bot(request: Request) -> JSONResponse:
|
||||
return JSONResponse({"room_url": room.url, "sipUri": room.config.sip_endpoint})
|
||||
|
||||
|
||||
@app.post("/daily_gemini_start_bot")
|
||||
async def daily_gemini_start_bot(request: Request) -> JSONResponse:
|
||||
# The /daily_start_bot is invoked when a call is received on Daily's SIP URI
|
||||
# daily_start_bot will create the room, put the call on hold until
|
||||
# the bot and sip worker are ready. Daily will automatically
|
||||
# forward the call to the SIP URi when dialin_ready fires.
|
||||
|
||||
# Use specified room URL, or create a new one if not specified
|
||||
room_url = os.getenv("DAILY_SAMPLE_ROOM_URL", None)
|
||||
# Get the dial-in properties from the request
|
||||
try:
|
||||
data = await request.json()
|
||||
if "test" in data:
|
||||
# Pass through any webhook checks
|
||||
return JSONResponse({"test": True})
|
||||
detect_voicemail = data.get("detectVoicemail", False)
|
||||
callId = data.get("callId", None)
|
||||
callDomain = data.get("callDomain", None)
|
||||
dialoutNumber = data.get("dialoutNumber", None)
|
||||
except Exception:
|
||||
raise HTTPException(
|
||||
status_code=500, detail="Missing properties 'callId', 'callDomain', or 'dialoutNumber'"
|
||||
)
|
||||
|
||||
room: DailyRoomObject = await _create_daily_room(
|
||||
room_url, callId, callDomain, dialoutNumber, "daily-gemini", detect_voicemail
|
||||
)
|
||||
|
||||
# Grab a token for the user to join with
|
||||
return JSONResponse({"room_url": room.url, "sipUri": room.config.sip_endpoint})
|
||||
|
||||
|
||||
# ----------------- Main ----------------- #
|
||||
|
||||
|
||||
|
||||
@@ -24,17 +24,15 @@ from pipecat.frames.frames import (
|
||||
)
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineTask
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.processors.frameworks.rtvi import RTVIObserver, RTVIProcessor
|
||||
from pipecat.processors.transcript_processor import TranscriptProcessor
|
||||
from pipecat.services.cartesia import CartesiaTTSService
|
||||
from pipecat.services.deepgram import DeepgramSTTService
|
||||
from pipecat.services.openai import OpenAILLMService
|
||||
from pipecat.transports.services.daily import (
|
||||
DailyParams,
|
||||
DailyTransport,
|
||||
)
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
@@ -166,21 +164,32 @@ async def main():
|
||||
async def on_transcript_update(processor, frame):
|
||||
await transcript_handler.on_transcript_update(processor, frame)
|
||||
|
||||
rtvi = RTVIProcessor()
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(),
|
||||
rtvi,
|
||||
stt,
|
||||
transcript.user(), # User transcripts
|
||||
tp,
|
||||
llm,
|
||||
tts,
|
||||
transport.output(),
|
||||
transcript.assistant(),
|
||||
context_aggregator.assistant(),
|
||||
transcript.assistant(), # Assistant transcripts
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(pipeline)
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
PipelineParams(
|
||||
allow_interruptions=False, # We don't want to interrupt the translator bot
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
observers=[RTVIObserver(rtvi)],
|
||||
),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
python-dotenv
|
||||
fastapi[all]
|
||||
pipecat-ai[daily,openai,azure]
|
||||
pipecat-ai[cartesia,daily,deepgram,openai,silero]
|
||||
aiohttp
|
||||
|
||||
@@ -33,7 +33,8 @@ dependencies = [
|
||||
"pydantic~=2.10.5",
|
||||
"pyloudnorm~=0.1.1",
|
||||
"resampy~=0.4.3",
|
||||
"soxr~=0.5.0"
|
||||
"soxr~=0.5.0",
|
||||
"openai~=1.59.6"
|
||||
]
|
||||
|
||||
[project.urls]
|
||||
@@ -44,11 +45,11 @@ Website = "https://pipecat.ai"
|
||||
anthropic = [ "anthropic~=0.45.2" ]
|
||||
assemblyai = [ "assemblyai~=0.36.0" ]
|
||||
aws = [ "boto3~=1.35.99" ]
|
||||
azure = [ "azure-cognitiveservices-speech~=1.42.0", "openai~=1.59.6" ]
|
||||
azure = [ "azure-cognitiveservices-speech~=1.42.0"]
|
||||
canonical = [ "aiofiles~=24.1.0" ]
|
||||
cartesia = [ "cartesia~=1.3.1", "websockets~=13.1" ]
|
||||
cerebras = [ "openai~=1.59.6" ]
|
||||
deepseek = [ "openai~=1.59.6" ]
|
||||
cerebras = []
|
||||
deepseek = []
|
||||
daily = [ "daily-python~=0.14.2" ]
|
||||
deepgram = [ "deepgram-sdk~=3.8.0" ]
|
||||
elevenlabs = [ "websockets~=13.1" ]
|
||||
@@ -56,10 +57,10 @@ fal = [ "fal-client~=0.5.6" ]
|
||||
fish = [ "ormsgpack~=1.7.0", "websockets~=13.1" ]
|
||||
gladia = [ "websockets~=13.1" ]
|
||||
google = [ "google-cloud-speech~=2.31.0", "google-cloud-texttospeech~=2.25.0", "google-genai~=1.2.0", "google-generativeai~=0.8.4" ]
|
||||
grok = [ "openai~=1.59.6" ]
|
||||
groq = [ "openai~=1.59.6" ]
|
||||
grok = []
|
||||
groq = []
|
||||
gstreamer = [ "pygobject~=3.50.0" ]
|
||||
fireworks = [ "openai~=1.59.6" ]
|
||||
fireworks = []
|
||||
krisp = [ "pipecat-ai-krisp~=0.3.0" ]
|
||||
koala = [ "pvkoala~=2.0.3" ]
|
||||
langchain = [ "langchain~=0.3.14", "langchain-community~=0.3.14", "langchain-openai~=0.3.0" ]
|
||||
@@ -67,11 +68,11 @@ livekit = [ "livekit~=0.19.1", "livekit-api~=0.8.1", "tenacity~=9.0.0" ]
|
||||
lmnt = [ "websockets~=13.1" ]
|
||||
local = [ "pyaudio~=0.2.14" ]
|
||||
moondream = [ "einops~=0.8.0", "timm~=1.0.13", "transformers~=4.48.0" ]
|
||||
nim = [ "openai~=1.59.6" ]
|
||||
nim = []
|
||||
noisereduce = [ "noisereduce~=3.0.3" ]
|
||||
openai = [ "openai~=1.59.6", "websockets~=13.1", "python-deepcompare~=2.1.0" ]
|
||||
openai = [ "websockets~=13.1" ]
|
||||
openpipe = [ "openpipe~=4.45.0" ]
|
||||
perplexity = [ "openai~=1.59.6" ]
|
||||
perplexity = []
|
||||
playht = [ "pyht~=0.1.6", "websockets~=13.1" ]
|
||||
rime = [ "websockets~=13.1" ]
|
||||
riva = [ "nvidia-riva-client~=2.18.0" ]
|
||||
@@ -79,10 +80,10 @@ sentry = [ "sentry-sdk~=2.20.0" ]
|
||||
silero = [ "onnxruntime~=1.20.1" ]
|
||||
simli = [ "simli-ai~=0.1.10"]
|
||||
soundfile = [ "soundfile~=0.13.0" ]
|
||||
together = [ "openai~=1.59.6" ]
|
||||
together = []
|
||||
websocket = [ "websockets~=13.1", "fastapi~=0.115.6" ]
|
||||
whisper = [ "faster-whisper~=1.1.1" ]
|
||||
openrouter = [ "openai~=1.59.6" ]
|
||||
openrouter = []
|
||||
|
||||
[tool.setuptools.packages.find]
|
||||
# All the following settings are optional:
|
||||
|
||||
0
src/pipecat/observers/loggers/__init__.py
Normal file
0
src/pipecat/observers/loggers/__init__.py
Normal file
85
src/pipecat/observers/loggers/llm_log_observer.py
Normal file
85
src/pipecat/observers/loggers/llm_log_observer.py
Normal file
@@ -0,0 +1,85 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
Frame,
|
||||
FunctionCallInProgressFrame,
|
||||
FunctionCallResultFrame,
|
||||
LLMFullResponseEndFrame,
|
||||
LLMFullResponseStartFrame,
|
||||
LLMMessagesFrame,
|
||||
LLMTextFrame,
|
||||
)
|
||||
from pipecat.observers.base_observer import BaseObserver
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.services.ai_services import LLMService
|
||||
|
||||
|
||||
class LLMLogObserver(BaseObserver):
|
||||
"""Observer to log LLM activity to the console.
|
||||
|
||||
Logs all frame instances (only from/to LLM service) of:
|
||||
|
||||
- LLMFullResponseStartFrame
|
||||
- LLMFullResponseEndFrame
|
||||
- LLMTextFrame
|
||||
- FunctionCallInProgressFrame
|
||||
- LLMMessagesFrame
|
||||
- OpenAILLMContextFrame
|
||||
|
||||
This allows you to track when the LLM starts responding, what it generates,
|
||||
and when it finishes.
|
||||
|
||||
"""
|
||||
|
||||
async def on_push_frame(
|
||||
self,
|
||||
src: FrameProcessor,
|
||||
dst: FrameProcessor,
|
||||
frame: Frame,
|
||||
direction: FrameDirection,
|
||||
timestamp: int,
|
||||
):
|
||||
if not isinstance(src, LLMService) and not isinstance(dst, LLMService):
|
||||
return
|
||||
|
||||
time_sec = timestamp / 1_000_000_000
|
||||
|
||||
arrow = "→"
|
||||
|
||||
# Log LLM start/end frames (output)
|
||||
if isinstance(frame, (LLMFullResponseStartFrame, LLMFullResponseEndFrame)):
|
||||
event = "START" if isinstance(frame, LLMFullResponseStartFrame) else "END"
|
||||
logger.debug(f"🧠 {src} {arrow} LLM {event} RESPONSE at {time_sec:.2f}s")
|
||||
# Log all LLMTextFrames (output)
|
||||
elif isinstance(frame, LLMTextFrame):
|
||||
logger.debug(f"🧠 {src} {arrow} LLM GENERATING: {frame.text!r} at {time_sec:.2f}s")
|
||||
# Log function calling (output)
|
||||
elif (
|
||||
isinstance(frame, FunctionCallInProgressFrame)
|
||||
and direction != FrameDirection.DOWNSTREAM
|
||||
):
|
||||
logger.debug(
|
||||
f"🧠 {src} {arrow} LLM FUNCTION CALL ({frame.tool_call_id}): {frame.function_name!r}({frame.arguments}) at {time_sec:.2f}s"
|
||||
)
|
||||
# Log LLMMessagesFrame (input)
|
||||
elif isinstance(frame, LLMMessagesFrame):
|
||||
logger.debug(
|
||||
f"🧠 {arrow} {dst} LLM MESSAGES FRAME: {frame.messages} at {time_sec:.2f}s"
|
||||
)
|
||||
# Log OpenAILLMContextFrame (input)
|
||||
elif isinstance(frame, OpenAILLMContextFrame):
|
||||
logger.debug(
|
||||
f"🧠 {arrow} {dst} LLM CONTEXT FRAME: {frame.context.messages} at {time_sec:.2f}s"
|
||||
)
|
||||
# Log function call result (input)
|
||||
elif isinstance(frame, FunctionCallResultFrame):
|
||||
logger.debug(
|
||||
f"🧠 {arrow} {src} LLM FUNCTION CALL RESULT ({frame.tool_call_id}): {frame.result} at {time_sec:.2f}s"
|
||||
)
|
||||
54
src/pipecat/observers/loggers/transcription_log_observer.py
Normal file
54
src/pipecat/observers/loggers/transcription_log_observer.py
Normal file
@@ -0,0 +1,54 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
Frame,
|
||||
InterimTranscriptionFrame,
|
||||
TranscriptionFrame,
|
||||
)
|
||||
from pipecat.observers.base_observer import BaseObserver
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.services.ai_services import STTService
|
||||
|
||||
|
||||
class TranscriptionLogObserver(BaseObserver):
|
||||
"""Observer to log transcription activity to the console.
|
||||
|
||||
Logs all frame instances (only from STT service) of:
|
||||
|
||||
- TranscriptionFrame
|
||||
- InterimTranscriptionFrame
|
||||
|
||||
This allows you to track when the LLM starts responding, what it generates,
|
||||
and when it finishes.
|
||||
|
||||
"""
|
||||
|
||||
async def on_push_frame(
|
||||
self,
|
||||
src: FrameProcessor,
|
||||
dst: FrameProcessor,
|
||||
frame: Frame,
|
||||
direction: FrameDirection,
|
||||
timestamp: int,
|
||||
):
|
||||
if not isinstance(src, STTService):
|
||||
return
|
||||
|
||||
time_sec = timestamp / 1_000_000_000
|
||||
|
||||
arrow = "→"
|
||||
|
||||
if isinstance(frame, TranscriptionFrame):
|
||||
logger.debug(
|
||||
f"💬 {src} {arrow} TRANSCRIPTION: {frame.text!r} from {frame.user_id!r} at {time_sec:.2f}s"
|
||||
)
|
||||
elif isinstance(frame, InterimTranscriptionFrame):
|
||||
logger.debug(
|
||||
f"💬 {src} {arrow} INTERIM TRANSCRIPTION: {frame.text!r} from {frame.user_id!r} at {time_sec:.2f}s"
|
||||
)
|
||||
@@ -145,6 +145,9 @@ class LLMResponseAggregator(BaseLLMResponseAggregator):
|
||||
frame = LLMMessagesFrame(self._messages)
|
||||
await self.push_frame(frame)
|
||||
|
||||
# Reset our accumulator state.
|
||||
self.reset()
|
||||
|
||||
|
||||
class LLMContextResponseAggregator(BaseLLMResponseAggregator):
|
||||
"""This is a base LLM aggregator that uses an LLM context to store the
|
||||
@@ -290,7 +293,13 @@ class LLMUserContextAggregator(LLMContextResponseAggregator):
|
||||
await self.push_aggregation()
|
||||
|
||||
async def _handle_transcription(self, frame: TranscriptionFrame):
|
||||
self._aggregation += f" {frame.text}" if self._aggregation else frame.text
|
||||
text = frame.text
|
||||
|
||||
# Make sure we really have some text.
|
||||
if not text.strip():
|
||||
return
|
||||
|
||||
self._aggregation += f" {text}" if self._aggregation else text
|
||||
# We just got a final result, so let's reset interim results.
|
||||
self._seen_interim_results = False
|
||||
# Reset aggregation timer.
|
||||
@@ -298,8 +307,6 @@ class LLMUserContextAggregator(LLMContextResponseAggregator):
|
||||
|
||||
async def _handle_interim_transcription(self, _: InterimTranscriptionFrame):
|
||||
self._seen_interim_results = True
|
||||
# Reset aggregation timer.
|
||||
self._aggregation_event.set()
|
||||
|
||||
def _create_aggregation_task(self):
|
||||
self._aggregation_task = self.create_task(self._aggregation_task_handler())
|
||||
|
||||
@@ -12,6 +12,12 @@ from dataclasses import dataclass
|
||||
from typing import Any, Awaitable, Callable, List, Optional
|
||||
|
||||
from loguru import logger
|
||||
from openai._types import NOT_GIVEN, NotGiven
|
||||
from openai.types.chat import (
|
||||
ChatCompletionMessageParam,
|
||||
ChatCompletionToolChoiceOptionParam,
|
||||
ChatCompletionToolParam,
|
||||
)
|
||||
from PIL import Image
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
@@ -22,20 +28,6 @@ from pipecat.frames.frames import (
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
|
||||
try:
|
||||
from openai._types import NOT_GIVEN, NotGiven
|
||||
from openai.types.chat import (
|
||||
ChatCompletionMessageParam,
|
||||
ChatCompletionToolChoiceOptionParam,
|
||||
ChatCompletionToolParam,
|
||||
)
|
||||
except ModuleNotFoundError as e:
|
||||
logger.error(f"Exception: {e}")
|
||||
logger.error(
|
||||
"In order to use OpenAI, you need to `pip install pipecat-ai[openai]`. Also, set `OPENAI_API_KEY` environment variable."
|
||||
)
|
||||
raise Exception(f"Missing module: {e}")
|
||||
|
||||
# JSON custom encoder to handle bytes arrays so that we can log contexts
|
||||
# with images to the console.
|
||||
|
||||
|
||||
@@ -22,9 +22,8 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
|
||||
class AudioBufferProcessor(FrameProcessor):
|
||||
"""This processor buffers audio raw frames (input and output). The mixed
|
||||
audio can be obtained by calling `get_audio()` (if `buffer_size` is 0) or by
|
||||
registering an "on_audio_data" event handler. The event handler will be
|
||||
called every time `buffer_size` is reached.
|
||||
audio can be obtained by registering an "on_audio_data" event handler.
|
||||
The event handler will be called every time `buffer_size` is reached.
|
||||
|
||||
You can provide the desired output `sample_rate` and incoming audio frames
|
||||
will resampled to match it. Also, you can provide the number of channels, 1
|
||||
|
||||
@@ -23,6 +23,7 @@ from pipecat.frames.frames import (
|
||||
Frame,
|
||||
FunctionCallInProgressFrame,
|
||||
FunctionCallResultFrame,
|
||||
InputAudioRawFrame,
|
||||
StartFrame,
|
||||
StartInterruptionFrame,
|
||||
StopInterruptionFrame,
|
||||
@@ -185,13 +186,14 @@ class STTMuteFilter(FrameProcessor):
|
||||
StopInterruptionFrame,
|
||||
UserStartedSpeakingFrame,
|
||||
UserStoppedSpeakingFrame,
|
||||
InputAudioRawFrame,
|
||||
),
|
||||
):
|
||||
# Only pass VAD-related frames when not muted
|
||||
if not self.is_muted:
|
||||
await self.push_frame(frame, direction)
|
||||
else:
|
||||
logger.debug(f"{frame.__class__.__name__} suppressed - STT currently muted")
|
||||
logger.trace(f"{frame.__class__.__name__} suppressed - STT currently muted")
|
||||
else:
|
||||
# Pass all other frames through
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
@@ -15,6 +15,7 @@ from loguru import logger
|
||||
from pipecat.audio.utils import calculate_audio_volume, exp_smoothing
|
||||
from pipecat.frames.frames import (
|
||||
AudioRawFrame,
|
||||
BotStartedSpeakingFrame,
|
||||
BotStoppedSpeakingFrame,
|
||||
CancelFrame,
|
||||
EndFrame,
|
||||
@@ -40,6 +41,7 @@ from pipecat.frames.frames import (
|
||||
from pipecat.metrics.metrics import MetricsData
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.services.websocket_service import WebsocketService
|
||||
from pipecat.transcriptions.language import Language
|
||||
from pipecat.utils.string import match_endofsentence
|
||||
from pipecat.utils.text.base_text_filter import BaseTextFilter
|
||||
@@ -175,6 +177,7 @@ class LLMService(AIService):
|
||||
f = self._callbacks[None]
|
||||
else:
|
||||
return None
|
||||
await self.call_start_function(context, function_name)
|
||||
await context.call_function(
|
||||
f,
|
||||
function_name=function_name,
|
||||
@@ -208,7 +211,7 @@ class TTSService(AIService):
|
||||
# if True, TTSService will push TTSStoppedFrames, otherwise subclass must do it
|
||||
push_stop_frames: bool = False,
|
||||
# if push_stop_frames is True, wait for this idle period before pushing TTSStoppedFrame
|
||||
stop_frame_timeout_s: float = 1.0,
|
||||
stop_frame_timeout_s: float = 2.0,
|
||||
# if True, TTSService will push silence audio frames after TTSStoppedFrame
|
||||
push_silence_after_stop: bool = False,
|
||||
# if push_silence_after_stop is True, send this amount of audio silence
|
||||
@@ -433,6 +436,12 @@ class TTSService(AIService):
|
||||
|
||||
|
||||
class WordTTSService(TTSService):
|
||||
"""This is a base class for TTS services that support word timestamps. Word
|
||||
timestamps are useful to synchronize audio with text of the spoken
|
||||
words. This way only the spoken words are added to the conversation context.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._initial_word_timestamp = -1
|
||||
@@ -502,11 +511,93 @@ class WordTTSService(TTSService):
|
||||
self._words_queue.task_done()
|
||||
|
||||
|
||||
class AudioContextWordTTSService(WordTTSService):
|
||||
"""This services allow us to send multiple TTS request to the services. Each
|
||||
request could be multiple sentences long which are grouped by context. For
|
||||
this to work, the TTS service needs to support handling multiple requests at
|
||||
once (i.e. multiple simultaneous contexts).
|
||||
class WebsocketTTSService(TTSService, WebsocketService):
|
||||
"""This is a base class for websocket-based TTS services."""
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
TTSService.__init__(self, **kwargs)
|
||||
WebsocketService.__init__(self)
|
||||
|
||||
|
||||
class InterruptibleTTSService(WebsocketTTSService):
|
||||
"""This is a base class for websocket-based TTS services that don't support
|
||||
word timestamps and that don't offer a way to correlate the generated audio
|
||||
to the requested text.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
|
||||
# Indicates if the bot is speaking. If the bot is not speaking we don't
|
||||
# need to reconnect when the user speaks. If the bot is speaking and the
|
||||
# user interrupts we need to reconnect.
|
||||
self._bot_speaking = False
|
||||
|
||||
async def _handle_interruption(self, frame: StartInterruptionFrame, direction: FrameDirection):
|
||||
await super()._handle_interruption(frame, direction)
|
||||
if self._bot_speaking:
|
||||
await self._disconnect()
|
||||
await self._connect()
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, BotStartedSpeakingFrame):
|
||||
self._bot_speaking = True
|
||||
elif isinstance(frame, BotStoppedSpeakingFrame):
|
||||
self._bot_speaking = False
|
||||
|
||||
|
||||
class WebsocketWordTTSService(WordTTSService, WebsocketService):
|
||||
"""This is a base class for websocket-based TTS services that support word
|
||||
timestamps.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
WordTTSService.__init__(self, **kwargs)
|
||||
WebsocketService.__init__(self)
|
||||
|
||||
|
||||
class InterruptibleWordTTSService(WebsocketWordTTSService):
|
||||
"""This is a base class for websocket-based TTS services that support word
|
||||
timestamps but don't offer a way to correlate the generated audio to the
|
||||
requested text.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
|
||||
# Indicates if the bot is speaking. If the bot is not speaking we don't
|
||||
# need to reconnect when the user speaks. If the bot is speaking and the
|
||||
# user interrupts we need to reconnect.
|
||||
self._bot_speaking = False
|
||||
|
||||
async def _handle_interruption(self, frame: StartInterruptionFrame, direction: FrameDirection):
|
||||
await super()._handle_interruption(frame, direction)
|
||||
if self._bot_speaking:
|
||||
await self._disconnect()
|
||||
await self._connect()
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, BotStartedSpeakingFrame):
|
||||
self._bot_speaking = True
|
||||
elif isinstance(frame, BotStoppedSpeakingFrame):
|
||||
self._bot_speaking = False
|
||||
|
||||
|
||||
class AudioContextWordTTSService(WebsocketWordTTSService):
|
||||
"""This is a base class for websocket-based TTS services that support word
|
||||
timestamps and also allow correlating the generated audio with the requested
|
||||
text.
|
||||
|
||||
Each request could be multiple sentences long which are grouped by
|
||||
context. For this to work, the TTS service needs to support handling
|
||||
multiple requests at once (i.e. multiple simultaneous contexts).
|
||||
|
||||
The audio received from the TTS will be played in context order. That is, if
|
||||
we requested audio for a context "A" and then audio for context "B", the
|
||||
|
||||
@@ -96,7 +96,7 @@ class AnthropicLLMService(LLMService):
|
||||
self,
|
||||
*,
|
||||
api_key: str,
|
||||
model: str = "claude-3-5-sonnet-20241022",
|
||||
model: str = "claude-3-7-sonnet-20250219",
|
||||
params: InputParams = InputParams(),
|
||||
client=None,
|
||||
**kwargs,
|
||||
@@ -743,18 +743,19 @@ class AnthropicAssistantContextAggregator(LLMAssistantContextAggregator):
|
||||
run_llm = False
|
||||
properties: Optional[FunctionCallResultProperties] = None
|
||||
|
||||
aggregation = self._aggregation
|
||||
aggregation = self._aggregation.strip()
|
||||
self.reset()
|
||||
|
||||
try:
|
||||
if aggregation:
|
||||
self._context.add_message({"role": "assistant", "content": aggregation})
|
||||
|
||||
if self._function_call_result:
|
||||
frame = self._function_call_result
|
||||
properties = frame.properties
|
||||
self._function_call_result = None
|
||||
if frame.result:
|
||||
assistant_message = {"role": "assistant", "content": []}
|
||||
if aggregation:
|
||||
assistant_message["content"].append({"type": "text", "text": aggregation})
|
||||
assistant_message["content"].append(
|
||||
{
|
||||
"type": "tool_use",
|
||||
@@ -782,8 +783,6 @@ class AnthropicAssistantContextAggregator(LLMAssistantContextAggregator):
|
||||
else:
|
||||
# Default behavior
|
||||
run_llm = True
|
||||
elif aggregation:
|
||||
self._context.add_message({"role": "assistant", "content": aggregation})
|
||||
|
||||
if self._pending_image_frame_message:
|
||||
frame = self._pending_image_frame_message
|
||||
|
||||
@@ -10,6 +10,7 @@ from typing import AsyncGenerator, Optional
|
||||
|
||||
import aiohttp
|
||||
from loguru import logger
|
||||
from openai import AsyncAzureOpenAI
|
||||
from PIL import Image
|
||||
from pydantic import BaseModel
|
||||
|
||||
@@ -48,7 +49,6 @@ try:
|
||||
PushAudioInputStream,
|
||||
)
|
||||
from azure.cognitiveservices.speech.dialog import AudioConfig
|
||||
from openai import AsyncAzureOpenAI
|
||||
except ModuleNotFoundError as e:
|
||||
logger.error(f"Exception: {e}")
|
||||
logger.error(
|
||||
|
||||
@@ -7,22 +7,14 @@
|
||||
from typing import AsyncGenerator, Optional
|
||||
|
||||
from loguru import logger
|
||||
from openai import AsyncOpenAI
|
||||
from openai.types.audio import Transcription
|
||||
|
||||
from pipecat.frames.frames import ErrorFrame, Frame, TranscriptionFrame
|
||||
from pipecat.services.ai_services import SegmentedSTTService
|
||||
from pipecat.transcriptions.language import Language
|
||||
from pipecat.utils.time import time_now_iso8601
|
||||
|
||||
try:
|
||||
from openai import AsyncOpenAI
|
||||
from openai.types.audio import Transcription
|
||||
except ModuleNotFoundError as e:
|
||||
logger.error(f"Exception: {e}")
|
||||
logger.error(
|
||||
"In order to use OpenAI, you need to `pip install pipecat-ai[openai]`. Also, set `OPENAI_API_KEY` environment variable."
|
||||
)
|
||||
raise Exception(f"Missing module: {e}")
|
||||
|
||||
|
||||
def language_to_whisper_language(language: Language) -> Optional[str]:
|
||||
"""Language support for Whisper API.
|
||||
|
||||
@@ -13,22 +13,18 @@ from loguru import logger
|
||||
from pydantic import BaseModel
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
BotStoppedSpeakingFrame,
|
||||
CancelFrame,
|
||||
EndFrame,
|
||||
ErrorFrame,
|
||||
Frame,
|
||||
LLMFullResponseEndFrame,
|
||||
StartFrame,
|
||||
StartInterruptionFrame,
|
||||
TTSAudioRawFrame,
|
||||
TTSSpeakFrame,
|
||||
TTSStartedFrame,
|
||||
TTSStoppedFrame,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.services.ai_services import AudioContextWordTTSService, TTSService
|
||||
from pipecat.services.websocket_service import WebsocketService
|
||||
from pipecat.transcriptions.language import Language
|
||||
|
||||
# See .env.example for Cartesia configuration needed
|
||||
@@ -75,7 +71,7 @@ def language_to_cartesia_language(language: Language) -> Optional[str]:
|
||||
return result
|
||||
|
||||
|
||||
class CartesiaTTSService(AudioContextWordTTSService, WebsocketService):
|
||||
class CartesiaTTSService(AudioContextWordTTSService):
|
||||
class InputParams(BaseModel):
|
||||
language: Optional[Language] = Language.EN
|
||||
speed: Optional[Union[str, float]] = ""
|
||||
@@ -105,15 +101,13 @@ class CartesiaTTSService(AudioContextWordTTSService, WebsocketService):
|
||||
# if we're interrupted. Cartesia gives us word-by-word timestamps. We
|
||||
# can use those to generate text frames ourselves aligned with the
|
||||
# playout timing of the audio!
|
||||
AudioContextWordTTSService.__init__(
|
||||
self,
|
||||
super().__init__(
|
||||
aggregate_sentences=True,
|
||||
push_text_frames=False,
|
||||
pause_frame_processing=True,
|
||||
sample_rate=sample_rate,
|
||||
**kwargs,
|
||||
)
|
||||
WebsocketService.__init__(self)
|
||||
|
||||
self._api_key = api_key
|
||||
self._cartesia_version = cartesia_version
|
||||
@@ -364,9 +358,6 @@ class CartesiaHttpTTSService(TTSService):
|
||||
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
|
||||
logger.debug(f"Generating TTS: [{text}]")
|
||||
|
||||
await self.start_ttfb_metrics()
|
||||
yield TTSStartedFrame()
|
||||
|
||||
try:
|
||||
voice_controls = None
|
||||
if self._settings["speed"] or self._settings["emotion"]:
|
||||
@@ -376,6 +367,8 @@ class CartesiaHttpTTSService(TTSService):
|
||||
if self._settings["emotion"]:
|
||||
voice_controls["emotion"] = self._settings["emotion"]
|
||||
|
||||
await self.start_ttfb_metrics()
|
||||
|
||||
output = await self._client.tts.sse(
|
||||
model_id=self._model_name,
|
||||
transcript=text,
|
||||
@@ -386,14 +379,17 @@ class CartesiaHttpTTSService(TTSService):
|
||||
_experimental_voice_controls=voice_controls,
|
||||
)
|
||||
|
||||
await self.start_tts_usage_metrics(text)
|
||||
|
||||
yield TTSStartedFrame()
|
||||
|
||||
frame = TTSAudioRawFrame(
|
||||
audio=output["audio"], sample_rate=self.sample_rate, num_channels=1
|
||||
)
|
||||
|
||||
yield frame
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
|
||||
await self.start_tts_usage_metrics(text)
|
||||
|
||||
await self.stop_ttfb_metrics()
|
||||
yield TTSStoppedFrame()
|
||||
finally:
|
||||
await self.stop_ttfb_metrics()
|
||||
yield TTSStoppedFrame()
|
||||
|
||||
@@ -7,22 +7,12 @@
|
||||
from typing import List
|
||||
|
||||
from loguru import logger
|
||||
from openai import AsyncStream
|
||||
from openai.types.chat import ChatCompletionChunk, ChatCompletionMessageParam
|
||||
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.services.openai import OpenAILLMService
|
||||
|
||||
try:
|
||||
from openai import (
|
||||
AsyncStream,
|
||||
)
|
||||
from openai.types.chat import ChatCompletionChunk, ChatCompletionMessageParam
|
||||
except ModuleNotFoundError as e:
|
||||
logger.error(f"Exception: {e}")
|
||||
logger.error(
|
||||
"In order to use Cerebras, you need to `pip install pipecat-ai[cerebras]`. Also, set `CEREBRAS_API_KEY` environment variable."
|
||||
)
|
||||
raise Exception(f"Missing module: {e}")
|
||||
|
||||
|
||||
class CerebrasLLMService(OpenAILLMService):
|
||||
"""A service for interacting with Cerebras's API using the OpenAI-compatible interface.
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
#
|
||||
|
||||
import asyncio
|
||||
from typing import AsyncGenerator, Optional
|
||||
from typing import AsyncGenerator, Dict, Optional
|
||||
|
||||
from loguru import logger
|
||||
|
||||
@@ -34,6 +34,7 @@ try:
|
||||
AsyncListenWebSocketClient,
|
||||
DeepgramClient,
|
||||
DeepgramClientOptions,
|
||||
ErrorResponse,
|
||||
LiveOptions,
|
||||
LiveResultResponse,
|
||||
LiveTranscriptionEvents,
|
||||
@@ -120,14 +121,16 @@ class DeepgramSTTService(STTService):
|
||||
url: str = "",
|
||||
sample_rate: Optional[int] = None,
|
||||
live_options: Optional[LiveOptions] = None,
|
||||
addons: Optional[Dict] = None,
|
||||
**kwargs,
|
||||
):
|
||||
sample_rate = sample_rate or (live_options.sample_rate if live_options else None)
|
||||
super().__init__(sample_rate=sample_rate, **kwargs)
|
||||
|
||||
default_options = LiveOptions(
|
||||
encoding="linear16",
|
||||
language=Language.EN,
|
||||
model="nova-2-general",
|
||||
model="nova-3-general",
|
||||
channels=1,
|
||||
interim_results=True,
|
||||
smart_format=True,
|
||||
@@ -147,6 +150,7 @@ class DeepgramSTTService(STTService):
|
||||
merged_options.language = merged_options.language.value
|
||||
|
||||
self._settings = merged_options.to_dict()
|
||||
self._addons = addons
|
||||
|
||||
self._client = DeepgramClient(
|
||||
api_key,
|
||||
@@ -155,13 +159,10 @@ class DeepgramSTTService(STTService):
|
||||
options={"keepalive": "true"}, # verbose=logging.DEBUG
|
||||
),
|
||||
)
|
||||
self._connection: AsyncListenWebSocketClient = self._client.listen.asyncwebsocket.v("1")
|
||||
self._connection.on(LiveTranscriptionEvents.Transcript, self._on_message)
|
||||
|
||||
if self.vad_enabled:
|
||||
self._register_event_handler("on_speech_started")
|
||||
self._register_event_handler("on_utterance_end")
|
||||
self._connection.on(LiveTranscriptionEvents.SpeechStarted, self._on_speech_started)
|
||||
self._connection.on(LiveTranscriptionEvents.UtteranceEnd, self._on_utterance_end)
|
||||
|
||||
@property
|
||||
def vad_enabled(self):
|
||||
@@ -202,7 +203,25 @@ class DeepgramSTTService(STTService):
|
||||
|
||||
async def _connect(self):
|
||||
logger.debug("Connecting to Deepgram")
|
||||
if not await self._connection.start(self._settings):
|
||||
|
||||
self._connection: AsyncListenWebSocketClient = self._client.listen.asyncwebsocket.v("1")
|
||||
|
||||
self._connection.on(
|
||||
LiveTranscriptionEvents(LiveTranscriptionEvents.Transcript), self._on_message
|
||||
)
|
||||
self._connection.on(LiveTranscriptionEvents(LiveTranscriptionEvents.Error), self._on_error)
|
||||
|
||||
if self.vad_enabled:
|
||||
self._connection.on(
|
||||
LiveTranscriptionEvents(LiveTranscriptionEvents.SpeechStarted),
|
||||
self._on_speech_started,
|
||||
)
|
||||
self._connection.on(
|
||||
LiveTranscriptionEvents(LiveTranscriptionEvents.UtteranceEnd),
|
||||
self._on_utterance_end,
|
||||
)
|
||||
|
||||
if not await self._connection.start(options=self._settings, addons=self._addons):
|
||||
logger.error(f"{self}: unable to connect to Deepgram")
|
||||
|
||||
async def _disconnect(self):
|
||||
@@ -214,6 +233,15 @@ class DeepgramSTTService(STTService):
|
||||
await self.start_ttfb_metrics()
|
||||
await self.start_processing_metrics()
|
||||
|
||||
async def _on_error(self, *args, **kwargs):
|
||||
error: ErrorResponse = kwargs["error"]
|
||||
logger.warning(f"{self} connection error, will retry: {error}")
|
||||
await self.stop_all_metrics()
|
||||
# NOTE(aleix): we don't disconnect (i.e. call finish on the connection)
|
||||
# because this triggers more errors internally in the Deepgram SDK. So,
|
||||
# we just forget about the previous connection and create a new one.
|
||||
await self._connect()
|
||||
|
||||
async def _on_speech_started(self, *args, **kwargs):
|
||||
await self.start_metrics()
|
||||
await self._call_event_handler("on_speech_started", *args, **kwargs)
|
||||
|
||||
@@ -8,22 +8,12 @@
|
||||
from typing import List
|
||||
|
||||
from loguru import logger
|
||||
from openai import AsyncStream
|
||||
from openai.types.chat import ChatCompletionChunk, ChatCompletionMessageParam
|
||||
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.services.openai import OpenAILLMService
|
||||
|
||||
try:
|
||||
from openai import (
|
||||
AsyncStream,
|
||||
)
|
||||
from openai.types.chat import ChatCompletionChunk, ChatCompletionMessageParam
|
||||
except ModuleNotFoundError as e:
|
||||
logger.error(f"Exception: {e}")
|
||||
logger.error(
|
||||
"In order to use DeepSeek, you need to `pip install pipecat-ai[deepseek]`. Also, set `DEEPSEEK_API_KEY` environment variable."
|
||||
)
|
||||
raise Exception(f"Missing module: {e}")
|
||||
|
||||
|
||||
class DeepSeekLLMService(OpenAILLMService):
|
||||
"""A service for interacting with DeepSeek's API using the OpenAI-compatible interface.
|
||||
|
||||
@@ -14,22 +14,18 @@ from loguru import logger
|
||||
from pydantic import BaseModel, model_validator
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
BotStoppedSpeakingFrame,
|
||||
CancelFrame,
|
||||
EndFrame,
|
||||
ErrorFrame,
|
||||
Frame,
|
||||
LLMFullResponseEndFrame,
|
||||
StartFrame,
|
||||
StartInterruptionFrame,
|
||||
TTSAudioRawFrame,
|
||||
TTSSpeakFrame,
|
||||
TTSStartedFrame,
|
||||
TTSStoppedFrame,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.services.ai_services import TTSService, WordTTSService
|
||||
from pipecat.services.websocket_service import WebsocketService
|
||||
from pipecat.services.ai_services import InterruptibleWordTTSService, TTSService
|
||||
from pipecat.transcriptions.language import Language
|
||||
|
||||
# See .env.example for ElevenLabs configuration needed
|
||||
@@ -141,7 +137,7 @@ def calculate_word_times(
|
||||
return word_times
|
||||
|
||||
|
||||
class ElevenLabsTTSService(WordTTSService, WebsocketService):
|
||||
class ElevenLabsTTSService(InterruptibleWordTTSService):
|
||||
class InputParams(BaseModel):
|
||||
language: Optional[Language] = None
|
||||
optimize_streaming_latency: Optional[str] = None
|
||||
@@ -186,17 +182,14 @@ class ElevenLabsTTSService(WordTTSService, WebsocketService):
|
||||
# Finally, ElevenLabs doesn't provide information on when the bot stops
|
||||
# speaking for a while, so we want the parent class to send TTSStopFrame
|
||||
# after a short period not receiving any audio.
|
||||
WordTTSService.__init__(
|
||||
self,
|
||||
super().__init__(
|
||||
aggregate_sentences=True,
|
||||
push_text_frames=False,
|
||||
push_stop_frames=True,
|
||||
stop_frame_timeout_s=2.0,
|
||||
pause_frame_processing=True,
|
||||
sample_rate=sample_rate,
|
||||
**kwargs,
|
||||
)
|
||||
WebsocketService.__init__(self)
|
||||
|
||||
self._api_key = api_key
|
||||
self._url = url
|
||||
@@ -567,18 +560,16 @@ class ElevenLabsHttpTTSService(TTSService):
|
||||
return
|
||||
|
||||
await self.start_tts_usage_metrics(text)
|
||||
|
||||
yield TTSStartedFrame()
|
||||
|
||||
async for chunk in response.content:
|
||||
if chunk:
|
||||
await self.stop_ttfb_metrics()
|
||||
yield TTSAudioRawFrame(chunk, self.sample_rate, 1)
|
||||
|
||||
yield TTSStoppedFrame()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in run_tts: {e}")
|
||||
yield ErrorFrame(error=str(e))
|
||||
|
||||
finally:
|
||||
await self.stop_ttfb_metrics()
|
||||
yield TTSStoppedFrame()
|
||||
|
||||
@@ -8,19 +8,11 @@
|
||||
from typing import List
|
||||
|
||||
from loguru import logger
|
||||
from openai.types.chat import ChatCompletionMessageParam
|
||||
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.services.openai import OpenAILLMService
|
||||
|
||||
try:
|
||||
from openai.types.chat import ChatCompletionMessageParam
|
||||
except ModuleNotFoundError as e:
|
||||
logger.error(f"Exception: {e}")
|
||||
logger.error(
|
||||
"In order to use Fireworks, you need to `pip install pipecat-ai[fireworks]`. Also, set `FIREWORKS_API_KEY` environment variable."
|
||||
)
|
||||
raise Exception(f"Missing module: {e}")
|
||||
|
||||
|
||||
class FireworksLLMService(OpenAILLMService):
|
||||
"""A service for interacting with Fireworks AI using the OpenAI-compatible interface.
|
||||
|
||||
@@ -11,22 +11,18 @@ from loguru import logger
|
||||
from pydantic import BaseModel
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
BotStoppedSpeakingFrame,
|
||||
CancelFrame,
|
||||
EndFrame,
|
||||
ErrorFrame,
|
||||
Frame,
|
||||
LLMFullResponseEndFrame,
|
||||
StartFrame,
|
||||
StartInterruptionFrame,
|
||||
TTSAudioRawFrame,
|
||||
TTSSpeakFrame,
|
||||
TTSStartedFrame,
|
||||
TTSStoppedFrame,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.services.ai_services import TTSService
|
||||
from pipecat.services.websocket_service import WebsocketService
|
||||
from pipecat.services.ai_services import InterruptibleTTSService
|
||||
from pipecat.transcriptions.language import Language
|
||||
|
||||
try:
|
||||
@@ -43,7 +39,7 @@ except ModuleNotFoundError as e:
|
||||
FishAudioOutputFormat = Literal["opus", "mp3", "pcm", "wav"]
|
||||
|
||||
|
||||
class FishAudioTTSService(TTSService, WebsocketService):
|
||||
class FishAudioTTSService(InterruptibleTTSService):
|
||||
class InputParams(BaseModel):
|
||||
language: Optional[Language] = Language.EN
|
||||
latency: Optional[str] = "normal" # "normal" or "balanced"
|
||||
@@ -60,7 +56,12 @@ class FishAudioTTSService(TTSService, WebsocketService):
|
||||
params: InputParams = InputParams(),
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(pause_frame_processing=True, sample_rate=sample_rate, **kwargs)
|
||||
super().__init__(
|
||||
push_stop_frames=True,
|
||||
pause_frame_processing=True,
|
||||
sample_rate=sample_rate,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
self._api_key = api_key
|
||||
self._base_url = "wss://api.fish.audio/v1/tts/live"
|
||||
@@ -108,11 +109,12 @@ class FishAudioTTSService(TTSService, WebsocketService):
|
||||
self._receive_task = self.create_task(self._receive_task_handler(self.push_error))
|
||||
|
||||
async def _disconnect(self):
|
||||
await self._disconnect_websocket()
|
||||
if self._receive_task:
|
||||
await self.cancel_task(self._receive_task)
|
||||
self._receive_task = None
|
||||
|
||||
await self._disconnect_websocket()
|
||||
|
||||
async def _connect_websocket(self):
|
||||
try:
|
||||
logger.debug("Connecting to Fish Audio")
|
||||
@@ -147,6 +149,11 @@ class FishAudioTTSService(TTSService, WebsocketService):
|
||||
return self._websocket
|
||||
raise Exception("Websocket not connected")
|
||||
|
||||
async def _handle_interruption(self, frame: StartInterruptionFrame, direction: FrameDirection):
|
||||
await super()._handle_interruption(frame, direction)
|
||||
await self.stop_all_metrics()
|
||||
self._request_id = None
|
||||
|
||||
async def _receive_messages(self):
|
||||
async for message in self._get_websocket():
|
||||
try:
|
||||
@@ -166,11 +173,6 @@ class FishAudioTTSService(TTSService, WebsocketService):
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing message: {e}")
|
||||
|
||||
async def _handle_interruption(self, frame: StartInterruptionFrame, direction: FrameDirection):
|
||||
await super()._handle_interruption(frame, direction)
|
||||
await self.stop_all_metrics()
|
||||
self._request_id = None
|
||||
|
||||
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
|
||||
logger.debug(f"Generating Fish TTS: [{text}]")
|
||||
try:
|
||||
|
||||
@@ -565,10 +565,15 @@ class GoogleAssistantContextAggregator(OpenAIAssistantContextAggregator):
|
||||
run_llm = False
|
||||
properties: Optional[FunctionCallResultProperties] = None
|
||||
|
||||
aggregation = self._aggregation
|
||||
aggregation = self._aggregation.strip()
|
||||
self.reset()
|
||||
|
||||
try:
|
||||
if aggregation:
|
||||
self._context.add_message(
|
||||
glm.Content(role="model", parts=[glm.Part(text=aggregation)])
|
||||
)
|
||||
|
||||
if self._function_call_result:
|
||||
frame = self._function_call_result
|
||||
properties = frame.properties
|
||||
@@ -608,11 +613,6 @@ class GoogleAssistantContextAggregator(OpenAIAssistantContextAggregator):
|
||||
else:
|
||||
# Default behavior is to run the LLM if there are no function calls in progress
|
||||
run_llm = not bool(self._function_calls_in_progress)
|
||||
else:
|
||||
if aggregation.strip():
|
||||
self._context.add_message(
|
||||
glm.Content(role="model", parts=[glm.Part(text=aggregation)])
|
||||
)
|
||||
|
||||
if self._pending_image_frame_message:
|
||||
frame = self._pending_image_frame_message
|
||||
|
||||
@@ -37,10 +37,13 @@ class GrokAssistantContextAggregator(OpenAIAssistantContextAggregator):
|
||||
run_llm = False
|
||||
properties: Optional[FunctionCallResultProperties] = None
|
||||
|
||||
aggregation = self._aggregation
|
||||
aggregation = self._aggregation.strip()
|
||||
self.reset()
|
||||
|
||||
try:
|
||||
if aggregation:
|
||||
self._context.add_message({"role": "assistant", "content": aggregation})
|
||||
|
||||
if self._function_call_result:
|
||||
frame = self._function_call_result
|
||||
properties = frame.properties
|
||||
@@ -77,9 +80,6 @@ class GrokAssistantContextAggregator(OpenAIAssistantContextAggregator):
|
||||
# Default behavior is to run the LLM if there are no function calls in progress
|
||||
run_llm = not bool(self._function_calls_in_progress)
|
||||
|
||||
else:
|
||||
self._context.add_message({"role": "assistant", "content": aggregation})
|
||||
|
||||
if self._pending_image_frame_message:
|
||||
frame = self._pending_image_frame_message
|
||||
self._pending_image_frame_message = None
|
||||
|
||||
@@ -21,8 +21,7 @@ from pipecat.frames.frames import (
|
||||
TTSStoppedFrame,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.services.ai_services import TTSService
|
||||
from pipecat.services.websocket_service import WebsocketService
|
||||
from pipecat.services.ai_services import InterruptibleTTSService
|
||||
from pipecat.transcriptions.language import Language
|
||||
|
||||
# See .env.example for LMNT configuration needed
|
||||
@@ -60,7 +59,7 @@ def language_to_lmnt_language(language: Language) -> Optional[str]:
|
||||
return result
|
||||
|
||||
|
||||
class LmntTTSService(TTSService, WebsocketService):
|
||||
class LmntTTSService(InterruptibleTTSService):
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
@@ -70,14 +69,12 @@ class LmntTTSService(TTSService, WebsocketService):
|
||||
language: Language = Language.EN,
|
||||
**kwargs,
|
||||
):
|
||||
TTSService.__init__(
|
||||
self,
|
||||
super().__init__(
|
||||
push_stop_frames=True,
|
||||
pause_frame_processing=True,
|
||||
sample_rate=sample_rate,
|
||||
**kwargs,
|
||||
)
|
||||
WebsocketService.__init__(self)
|
||||
|
||||
self._api_key = api_key
|
||||
self._voice_id = voice_id
|
||||
@@ -116,12 +113,12 @@ class LmntTTSService(TTSService, WebsocketService):
|
||||
self._receive_task = self.create_task(self._receive_task_handler(self.push_error))
|
||||
|
||||
async def _disconnect(self):
|
||||
await self._disconnect_websocket()
|
||||
|
||||
if self._receive_task:
|
||||
await self.cancel_task(self._receive_task)
|
||||
self._receive_task = None
|
||||
|
||||
await self._disconnect_websocket()
|
||||
|
||||
async def _connect_websocket(self):
|
||||
"""Connect to LMNT websocket."""
|
||||
try:
|
||||
@@ -153,8 +150,9 @@ class LmntTTSService(TTSService, WebsocketService):
|
||||
|
||||
if self._websocket:
|
||||
logger.debug("Disconnecting from LMNT")
|
||||
# Send EOF message before closing
|
||||
await self._websocket.send(json.dumps({"eof": True}))
|
||||
# NOTE(aleix): sending EOF message before closing is causing
|
||||
# errors on the websocket, so we just skip it for now.
|
||||
# await self._websocket.send(json.dumps({"eof": True}))
|
||||
await self._websocket.close()
|
||||
self._websocket = None
|
||||
|
||||
|
||||
@@ -13,6 +13,14 @@ from typing import Any, AsyncGenerator, Dict, List, Literal, Optional
|
||||
import aiohttp
|
||||
import httpx
|
||||
from loguru import logger
|
||||
from openai import (
|
||||
NOT_GIVEN,
|
||||
AsyncOpenAI,
|
||||
AsyncStream,
|
||||
BadRequestError,
|
||||
DefaultAsyncHttpxClient,
|
||||
)
|
||||
from openai.types.chat import ChatCompletionChunk, ChatCompletionMessageParam
|
||||
from PIL import Image
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
@@ -57,23 +65,6 @@ from pipecat.services.base_whisper import BaseWhisperSTTService, Transcription
|
||||
from pipecat.transcriptions.language import Language
|
||||
from pipecat.utils.time import time_now_iso8601
|
||||
|
||||
try:
|
||||
from openai import (
|
||||
NOT_GIVEN,
|
||||
AsyncOpenAI,
|
||||
AsyncStream,
|
||||
BadRequestError,
|
||||
DefaultAsyncHttpxClient,
|
||||
)
|
||||
from openai.types.chat import ChatCompletionChunk, ChatCompletionMessageParam
|
||||
except ModuleNotFoundError as e:
|
||||
logger.error(f"Exception: {e}")
|
||||
logger.error(
|
||||
"In order to use OpenAI, you need to `pip install pipecat-ai[openai]`. Also, set `OPENAI_API_KEY` environment variable."
|
||||
)
|
||||
raise Exception(f"Missing module: {e}")
|
||||
|
||||
|
||||
ValidVoice = Literal["alloy", "echo", "fable", "onyx", "nova", "shimmer"]
|
||||
|
||||
VALID_VOICES: Dict[str, ValidVoice] = {
|
||||
@@ -266,7 +257,6 @@ class BaseOpenAILLMService(LLMService):
|
||||
if tool_call.function and tool_call.function.name:
|
||||
function_name += tool_call.function.name
|
||||
tool_call_id = tool_call.id
|
||||
await self.call_start_function(context, function_name)
|
||||
if tool_call.function and tool_call.function.arguments:
|
||||
# Keep iterating through the response to collect all the argument fragments
|
||||
arguments += tool_call.function.arguments
|
||||
@@ -632,10 +622,13 @@ class OpenAIAssistantContextAggregator(LLMAssistantContextAggregator):
|
||||
run_llm = False
|
||||
properties: Optional[FunctionCallResultProperties] = None
|
||||
|
||||
aggregation = self._aggregation
|
||||
aggregation = self._aggregation.strip()
|
||||
self.reset()
|
||||
|
||||
try:
|
||||
if aggregation:
|
||||
self._context.add_message({"role": "assistant", "content": aggregation})
|
||||
|
||||
if self._function_call_result:
|
||||
frame = self._function_call_result
|
||||
properties = frame.properties
|
||||
@@ -670,9 +663,6 @@ class OpenAIAssistantContextAggregator(LLMAssistantContextAggregator):
|
||||
# Default behavior is to run the LLM if there are no function calls in progress
|
||||
run_llm = not bool(self._function_calls_in_progress)
|
||||
|
||||
else:
|
||||
self._context.add_message({"role": "assistant", "content": aggregation})
|
||||
|
||||
if self._pending_image_frame_message:
|
||||
frame = self._pending_image_frame_message
|
||||
self._pending_image_frame_message = None
|
||||
|
||||
@@ -10,9 +10,17 @@ import json
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
|
||||
import websockets
|
||||
from loguru import logger
|
||||
|
||||
try:
|
||||
import websockets
|
||||
except ModuleNotFoundError as e:
|
||||
logger.error(f"Exception: {e}")
|
||||
logger.error(
|
||||
"In order to use OpenAI, you need to `pip install pipecat-ai[openai]`. Also, set `OPENAI_API_KEY` environment variable."
|
||||
)
|
||||
raise Exception(f"Missing module: {e}")
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
BotStoppedSpeakingFrame,
|
||||
CancelFrame,
|
||||
|
||||
@@ -7,12 +7,12 @@
|
||||
from typing import Dict, List, Optional
|
||||
|
||||
from loguru import logger
|
||||
from openai.types.chat import ChatCompletionChunk, ChatCompletionMessageParam
|
||||
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.services.openai import OpenAILLMService
|
||||
|
||||
try:
|
||||
from openai.types.chat import ChatCompletionChunk, ChatCompletionMessageParam
|
||||
from openpipe import AsyncOpenAI as OpenPipeAI
|
||||
from openpipe import AsyncStream
|
||||
except ModuleNotFoundError as e:
|
||||
|
||||
@@ -7,24 +7,13 @@
|
||||
from typing import List
|
||||
|
||||
from loguru import logger
|
||||
from openai import NOT_GIVEN, AsyncStream
|
||||
from openai.types.chat import ChatCompletionChunk, ChatCompletionMessageParam
|
||||
|
||||
from pipecat.metrics.metrics import LLMTokenUsage
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.services.openai import OpenAILLMService
|
||||
|
||||
try:
|
||||
from openai import (
|
||||
NOT_GIVEN,
|
||||
AsyncStream,
|
||||
)
|
||||
from openai.types.chat import ChatCompletionChunk, ChatCompletionMessageParam
|
||||
except ModuleNotFoundError as e:
|
||||
logger.error(f"Exception: {e}")
|
||||
logger.error(
|
||||
"In order to use Perplexity, you need to `pip install pipecat-ai[perplexity]`. Also, set `PERPLEXITY_API_KEY` environment variable."
|
||||
)
|
||||
raise Exception(f"Missing module: {e}")
|
||||
|
||||
|
||||
class PerplexityLLMService(OpenAILLMService):
|
||||
"""A service for interacting with Perplexity's API.
|
||||
|
||||
@@ -16,22 +16,18 @@ from loguru import logger
|
||||
from pydantic import BaseModel
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
BotStoppedSpeakingFrame,
|
||||
CancelFrame,
|
||||
EndFrame,
|
||||
ErrorFrame,
|
||||
Frame,
|
||||
LLMFullResponseEndFrame,
|
||||
StartFrame,
|
||||
StartInterruptionFrame,
|
||||
TTSAudioRawFrame,
|
||||
TTSSpeakFrame,
|
||||
TTSStartedFrame,
|
||||
TTSStoppedFrame,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.services.ai_services import TTSService
|
||||
from pipecat.services.websocket_service import WebsocketService
|
||||
from pipecat.services.ai_services import InterruptibleTTSService, TTSService
|
||||
from pipecat.transcriptions.language import Language
|
||||
|
||||
try:
|
||||
@@ -100,7 +96,7 @@ def language_to_playht_language(language: Language) -> Optional[str]:
|
||||
return result
|
||||
|
||||
|
||||
class PlayHTTTSService(TTSService, WebsocketService):
|
||||
class PlayHTTTSService(InterruptibleTTSService):
|
||||
class InputParams(BaseModel):
|
||||
language: Optional[Language] = Language.EN
|
||||
speed: Optional[float] = 1.0
|
||||
@@ -118,13 +114,11 @@ class PlayHTTTSService(TTSService, WebsocketService):
|
||||
params: InputParams = InputParams(),
|
||||
**kwargs,
|
||||
):
|
||||
TTSService.__init__(
|
||||
self,
|
||||
super().__init__(
|
||||
pause_frame_processing=True,
|
||||
sample_rate=sample_rate,
|
||||
**kwargs,
|
||||
)
|
||||
WebsocketService.__init__(self)
|
||||
|
||||
self._api_key = api_key
|
||||
self._user_id = user_id
|
||||
@@ -168,12 +162,12 @@ class PlayHTTTSService(TTSService, WebsocketService):
|
||||
self._receive_task = self.create_task(self._receive_task_handler(self.push_error))
|
||||
|
||||
async def _disconnect(self):
|
||||
await self._disconnect_websocket()
|
||||
|
||||
if self._receive_task:
|
||||
await self.cancel_task(self._receive_task)
|
||||
self._receive_task = None
|
||||
|
||||
await self._disconnect_websocket()
|
||||
|
||||
async def _connect_websocket(self):
|
||||
try:
|
||||
logger.debug("Connecting to PlayHT")
|
||||
@@ -397,6 +391,7 @@ class PlayHTHttpTTSService(TTSService):
|
||||
await self.start_tts_usage_metrics(text)
|
||||
|
||||
yield TTSStartedFrame()
|
||||
|
||||
async for chunk in playht_gen:
|
||||
# skip the RIFF header.
|
||||
if in_header:
|
||||
@@ -416,6 +411,8 @@ class PlayHTHttpTTSService(TTSService):
|
||||
await self.stop_ttfb_metrics()
|
||||
frame = TTSAudioRawFrame(chunk, self.sample_rate, 1)
|
||||
yield frame
|
||||
yield TTSStoppedFrame()
|
||||
except Exception as e:
|
||||
logger.error(f"{self} error generating TTS: {e}")
|
||||
finally:
|
||||
await self.stop_ttfb_metrics()
|
||||
yield TTSStoppedFrame()
|
||||
|
||||
@@ -14,22 +14,18 @@ from loguru import logger
|
||||
from pydantic import BaseModel
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
BotStoppedSpeakingFrame,
|
||||
CancelFrame,
|
||||
EndFrame,
|
||||
ErrorFrame,
|
||||
Frame,
|
||||
LLMFullResponseEndFrame,
|
||||
StartFrame,
|
||||
StartInterruptionFrame,
|
||||
TTSAudioRawFrame,
|
||||
TTSSpeakFrame,
|
||||
TTSStartedFrame,
|
||||
TTSStoppedFrame,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.services.ai_services import AudioContextWordTTSService, TTSService
|
||||
from pipecat.services.websocket_service import WebsocketService
|
||||
from pipecat.transcriptions.language import Language
|
||||
|
||||
try:
|
||||
@@ -58,7 +54,7 @@ def language_to_rime_language(language: Language) -> str:
|
||||
return LANGUAGE_MAP.get(language, "eng")
|
||||
|
||||
|
||||
class RimeTTSService(AudioContextWordTTSService, WebsocketService):
|
||||
class RimeTTSService(AudioContextWordTTSService):
|
||||
"""Text-to-Speech service using Rime's websocket API.
|
||||
|
||||
Uses Rime's websocket JSON API to convert text to speech with word-level timing
|
||||
@@ -95,17 +91,14 @@ class RimeTTSService(AudioContextWordTTSService, WebsocketService):
|
||||
params: Additional configuration parameters.
|
||||
"""
|
||||
# Initialize with parent class settings for proper frame handling
|
||||
AudioContextWordTTSService.__init__(
|
||||
self,
|
||||
super().__init__(
|
||||
aggregate_sentences=True,
|
||||
push_text_frames=False,
|
||||
push_stop_frames=True,
|
||||
stop_frame_timeout_s=2.0,
|
||||
pause_frame_processing=True,
|
||||
sample_rate=sample_rate,
|
||||
**kwargs,
|
||||
)
|
||||
WebsocketService.__init__(self)
|
||||
|
||||
# Store service configuration
|
||||
self._api_key = api_key
|
||||
@@ -176,11 +169,12 @@ class RimeTTSService(AudioContextWordTTSService, WebsocketService):
|
||||
|
||||
async def _disconnect(self):
|
||||
"""Close websocket connection and clean up tasks."""
|
||||
await self._disconnect_websocket()
|
||||
if self._receive_task:
|
||||
await self.cancel_task(self._receive_task)
|
||||
self._receive_task = None
|
||||
|
||||
await self._disconnect_websocket()
|
||||
|
||||
async def _connect_websocket(self):
|
||||
"""Connect to Rime websocket API with configured settings."""
|
||||
try:
|
||||
@@ -250,7 +244,9 @@ class RimeTTSService(AudioContextWordTTSService, WebsocketService):
|
||||
async def flush_audio(self):
|
||||
if not self._context_id or not self._websocket:
|
||||
return
|
||||
|
||||
logger.trace(f"{self}: flushing audio")
|
||||
await self._get_websocket().send(json.dumps({"text": " "}))
|
||||
self._context_id = None
|
||||
|
||||
async def _receive_messages(self):
|
||||
@@ -349,7 +345,8 @@ class RimeHttpTTSService(TTSService):
|
||||
self,
|
||||
*,
|
||||
api_key: str,
|
||||
voice_id: str = "eva",
|
||||
voice_id: str,
|
||||
aiohttp_session: aiohttp.ClientSession,
|
||||
model: str = "mistv2",
|
||||
sample_rate: Optional[int] = None,
|
||||
params: InputParams = InputParams(),
|
||||
@@ -358,6 +355,7 @@ class RimeHttpTTSService(TTSService):
|
||||
super().__init__(sample_rate=sample_rate, **kwargs)
|
||||
|
||||
self._api_key = api_key
|
||||
self._session = aiohttp_session
|
||||
self._base_url = "https://users.rime.ai/v1/rime-tts"
|
||||
self._settings = {
|
||||
"speedAlpha": params.speed_alpha,
|
||||
@@ -391,36 +389,31 @@ class RimeHttpTTSService(TTSService):
|
||||
|
||||
try:
|
||||
await self.start_ttfb_metrics()
|
||||
await self.start_tts_usage_metrics(text)
|
||||
|
||||
yield TTSStartedFrame()
|
||||
async with self._session.post(
|
||||
self._base_url, json=payload, headers=headers
|
||||
) as response:
|
||||
if response.status != 200:
|
||||
error_message = f"Rime TTS error: HTTP {response.status}"
|
||||
logger.error(error_message)
|
||||
yield ErrorFrame(error=error_message)
|
||||
return
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.post(self._base_url, json=payload, headers=headers) as response:
|
||||
if response.status != 200:
|
||||
error_message = f"Rime TTS error: HTTP {response.status}"
|
||||
logger.error(error_message)
|
||||
yield ErrorFrame(error=error_message)
|
||||
return
|
||||
await self.start_tts_usage_metrics(text)
|
||||
|
||||
# Process the streaming response
|
||||
chunk_size = 8192
|
||||
first_chunk = True
|
||||
yield TTSStartedFrame()
|
||||
|
||||
async for chunk in response.content.iter_chunked(chunk_size):
|
||||
if first_chunk:
|
||||
await self.stop_ttfb_metrics()
|
||||
first_chunk = False
|
||||
|
||||
if chunk:
|
||||
frame = TTSAudioRawFrame(chunk, self.sample_rate, 1)
|
||||
yield frame
|
||||
|
||||
yield TTSStoppedFrame()
|
||||
# Process the streaming response
|
||||
chunk_size = 8192
|
||||
|
||||
async for chunk in response.content.iter_chunked(chunk_size):
|
||||
if chunk:
|
||||
await self.stop_ttfb_metrics()
|
||||
frame = TTSAudioRawFrame(chunk, self.sample_rate, 1)
|
||||
yield frame
|
||||
except Exception as e:
|
||||
logger.exception(f"Error generating TTS: {e}")
|
||||
yield ErrorFrame(error=f"Rime TTS error: {str(e)}")
|
||||
|
||||
finally:
|
||||
await self.stop_ttfb_metrics()
|
||||
yield TTSStoppedFrame()
|
||||
|
||||
@@ -13,6 +13,7 @@ from loguru import logger
|
||||
from websockets.protocol import State
|
||||
|
||||
from pipecat.frames.frames import ErrorFrame
|
||||
from pipecat.utils.network import exponential_backoff_time
|
||||
|
||||
|
||||
class WebsocketService(ABC):
|
||||
@@ -51,27 +52,6 @@ class WebsocketService(ABC):
|
||||
await self._connect_websocket()
|
||||
return await self._verify_connection()
|
||||
|
||||
def _calculate_wait_time(
|
||||
self, attempt: int, min_wait: float = 4, max_wait: float = 10, multiplier: float = 1
|
||||
) -> float:
|
||||
"""Calculate exponential backoff wait time.
|
||||
|
||||
Args:
|
||||
attempt: Current attempt number (1-based)
|
||||
min_wait: Minimum wait time in seconds
|
||||
max_wait: Maximum wait time in seconds
|
||||
multiplier: Base multiplier for exponential calculation
|
||||
|
||||
Returns:
|
||||
Wait time in seconds
|
||||
"""
|
||||
try:
|
||||
exp = 2 ** (attempt - 1) * multiplier
|
||||
result = max(0, min(exp, max_wait))
|
||||
return max(min_wait, result)
|
||||
except (ValueError, ArithmeticError):
|
||||
return max_wait
|
||||
|
||||
async def _receive_task_handler(self, report_error: Callable[[ErrorFrame], Awaitable[None]]):
|
||||
"""Handles WebSocket message receiving with automatic retry logic.
|
||||
|
||||
@@ -104,20 +84,38 @@ class WebsocketService(ABC):
|
||||
try:
|
||||
if await self._reconnect_websocket(retry_count):
|
||||
retry_count = 0 # Reset counter on successful reconnection
|
||||
wait_time = self._calculate_wait_time(retry_count)
|
||||
wait_time = exponential_backoff_time(retry_count)
|
||||
await asyncio.sleep(wait_time)
|
||||
except Exception as reconnect_error:
|
||||
logger.error(f"{self} reconnection failed: {reconnect_error}")
|
||||
continue
|
||||
|
||||
@abstractmethod
|
||||
async def _connect(self):
|
||||
"""Implement service-specific connection logic. This function will
|
||||
connect to the websocket via _connect_websocket() among other connection
|
||||
logic."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def _disconnect(self):
|
||||
"""Implement service-specific disconnection logic. This function will
|
||||
disconnect to the websocket via _connect_websocket() among other
|
||||
connection logic.
|
||||
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def _connect_websocket(self):
|
||||
"""Implement service-specific websocket connection logic."""
|
||||
"""Implement service-specific websocket connection logic. This function
|
||||
should only connect to the websocket."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def _disconnect_websocket(self):
|
||||
"""Implement service-specific websocket disconnection logic."""
|
||||
"""Implement service-specific websocket disconnection logic. This
|
||||
function should only disconnect from the websocket."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
|
||||
@@ -174,11 +174,9 @@ class BaseInputTransport(FrameProcessor):
|
||||
async def _vad_analyze(self, audio_frame: InputAudioRawFrame) -> VADState:
|
||||
state = VADState.QUIET
|
||||
if self.vad_analyzer:
|
||||
logger.trace(f"{self}: analyzing VAD on {audio_frame}")
|
||||
state = await self.get_event_loop().run_in_executor(
|
||||
self._executor, self.vad_analyzer.analyze_audio, audio_frame.audio
|
||||
)
|
||||
logger.trace(f"{self}: done analyzing VAD on {audio_frame}")
|
||||
return state
|
||||
|
||||
async def _handle_vad(self, audio_frame: InputAudioRawFrame, vad_state: VADState):
|
||||
|
||||
@@ -4,7 +4,6 @@
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import inspect
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Optional
|
||||
@@ -30,7 +29,6 @@ class TransportParams(BaseModel):
|
||||
camera_out_framerate: int = 30
|
||||
camera_out_color_format: str = "RGB"
|
||||
audio_out_enabled: bool = False
|
||||
audio_out_is_live: bool = False
|
||||
audio_out_sample_rate: Optional[int] = None
|
||||
audio_out_channels: int = 1
|
||||
audio_out_bitrate: int = 96000
|
||||
|
||||
@@ -55,45 +55,89 @@ class FastAPIWebsocketCallbacks(BaseModel):
|
||||
on_session_timeout: Callable[[WebSocket], Awaitable[None]]
|
||||
|
||||
|
||||
class FastAPIWebsocketClient:
|
||||
def __init__(self, websocket: WebSocket, is_binary: bool, callbacks: FastAPIWebsocketCallbacks):
|
||||
self._websocket = websocket
|
||||
self._closing = False
|
||||
self._is_binary = is_binary
|
||||
self._callbacks = callbacks
|
||||
|
||||
def receive(self) -> typing.AsyncIterator[bytes | str]:
|
||||
return self._websocket.iter_bytes() if self._is_binary else self._websocket.iter_text()
|
||||
|
||||
async def send(self, data: str | bytes):
|
||||
if self._can_send():
|
||||
if self._is_binary:
|
||||
await self._websocket.send_bytes(data)
|
||||
else:
|
||||
await self._websocket.send_text(data)
|
||||
|
||||
async def disconnect(self):
|
||||
if self.is_connected and not self.is_closing:
|
||||
self._closing = True
|
||||
await self._websocket.close()
|
||||
await self.trigger_client_disconnected()
|
||||
|
||||
async def trigger_client_disconnected(self):
|
||||
await self._callbacks.on_client_disconnected(self._websocket)
|
||||
|
||||
async def trigger_client_connected(self):
|
||||
await self._callbacks.on_client_connected(self._websocket)
|
||||
|
||||
async def trigger_client_timout(self):
|
||||
await self._callbacks.on_session_timeout(self._websocket)
|
||||
|
||||
def _can_send(self):
|
||||
return self.is_connected and not self.is_closing
|
||||
|
||||
@property
|
||||
def is_connected(self) -> bool:
|
||||
return self._websocket.client_state == WebSocketState.CONNECTED
|
||||
|
||||
@property
|
||||
def is_closing(self) -> bool:
|
||||
return self._closing
|
||||
|
||||
|
||||
class FastAPIWebsocketInputTransport(BaseInputTransport):
|
||||
def __init__(
|
||||
self,
|
||||
websocket: WebSocket,
|
||||
client: FastAPIWebsocketClient,
|
||||
params: FastAPIWebsocketParams,
|
||||
callbacks: FastAPIWebsocketCallbacks,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(params, **kwargs)
|
||||
|
||||
self._websocket = websocket
|
||||
self._client = client
|
||||
self._params = params
|
||||
self._callbacks = callbacks
|
||||
self._receive_task = None
|
||||
self._monitor_websocket_task = None
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
await self._params.serializer.setup(frame)
|
||||
if self._params.session_timeout:
|
||||
self._monitor_websocket_task = self.create_task(self._monitor_websocket())
|
||||
await self._callbacks.on_client_connected(self._websocket)
|
||||
await self._client.trigger_client_connected()
|
||||
self._receive_task = self.create_task(self._receive_messages())
|
||||
|
||||
async def _stop_tasks(self):
|
||||
if self._monitor_websocket_task:
|
||||
await self.cancel_task(self._monitor_websocket_task)
|
||||
await self.cancel_task(self._receive_task)
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
await super().stop(frame)
|
||||
await self.cancel_task(self._receive_task)
|
||||
await self._stop_tasks()
|
||||
await self._client.disconnect()
|
||||
|
||||
async def cancel(self, frame: CancelFrame):
|
||||
await super().cancel(frame)
|
||||
await self.cancel_task(self._receive_task)
|
||||
|
||||
def _iter_data(self) -> typing.AsyncIterator[bytes | str]:
|
||||
if self._params.serializer.type == FrameSerializerType.BINARY:
|
||||
return self._websocket.iter_bytes()
|
||||
else:
|
||||
return self._websocket.iter_text()
|
||||
await self._stop_tasks()
|
||||
await self._client.disconnect()
|
||||
|
||||
async def _receive_messages(self):
|
||||
try:
|
||||
async for message in self._iter_data():
|
||||
async for message in self._client.receive():
|
||||
frame = await self._params.serializer.deserialize(message)
|
||||
|
||||
if not frame:
|
||||
@@ -106,19 +150,23 @@ class FastAPIWebsocketInputTransport(BaseInputTransport):
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception receiving data: {e.__class__.__name__} ({e})")
|
||||
|
||||
await self._callbacks.on_client_disconnected(self._websocket)
|
||||
await self._client.trigger_client_disconnected()
|
||||
|
||||
async def _monitor_websocket(self):
|
||||
"""Wait for self._params.session_timeout seconds, if the websocket is still open, trigger timeout event."""
|
||||
await asyncio.sleep(self._params.session_timeout)
|
||||
await self._callbacks.on_session_timeout(self._websocket)
|
||||
await self._client.trigger_client_timout()
|
||||
|
||||
|
||||
class FastAPIWebsocketOutputTransport(BaseOutputTransport):
|
||||
def __init__(self, websocket: WebSocket, params: FastAPIWebsocketParams, **kwargs):
|
||||
def __init__(
|
||||
self,
|
||||
client: FastAPIWebsocketClient,
|
||||
params: FastAPIWebsocketParams,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(params, **kwargs)
|
||||
|
||||
self._websocket = websocket
|
||||
self._client = client
|
||||
self._params = params
|
||||
|
||||
# write_raw_audio_frames() is called quickly, as soon as we get audio
|
||||
@@ -134,6 +182,14 @@ class FastAPIWebsocketOutputTransport(BaseOutputTransport):
|
||||
await self._params.serializer.setup(frame)
|
||||
self._send_interval = (self._audio_chunk_size / self.sample_rate) / 2
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
await super().stop(frame)
|
||||
await self._client.disconnect()
|
||||
|
||||
async def cancel(self, frame: CancelFrame):
|
||||
await super().cancel(frame)
|
||||
await self._client.disconnect()
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
@@ -145,7 +201,10 @@ class FastAPIWebsocketOutputTransport(BaseOutputTransport):
|
||||
await self._write_frame(frame)
|
||||
|
||||
async def write_raw_audio_frames(self, frames: bytes):
|
||||
if self._websocket.client_state != WebSocketState.CONNECTED:
|
||||
if self._client.is_closing:
|
||||
return
|
||||
|
||||
if not self._client.is_connected:
|
||||
# Simulate audio playback with a sleep.
|
||||
await self._write_audio_sleep()
|
||||
return
|
||||
@@ -172,25 +231,17 @@ class FastAPIWebsocketOutputTransport(BaseOutputTransport):
|
||||
|
||||
await self._write_frame(frame)
|
||||
|
||||
self._websocket_audio_buffer = bytes()
|
||||
|
||||
# Simulate audio playback with a sleep.
|
||||
await self._write_audio_sleep()
|
||||
|
||||
async def _write_frame(self, frame: Frame):
|
||||
try:
|
||||
payload = await self._params.serializer.serialize(frame)
|
||||
if payload and self._websocket.client_state == WebSocketState.CONNECTED:
|
||||
await self._send_data(payload)
|
||||
if payload:
|
||||
await self._client.send(payload)
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception sending data: {e.__class__.__name__} ({e})")
|
||||
|
||||
def _send_data(self, data: str | bytes):
|
||||
if self._params.serializer.type == FrameSerializerType.BINARY:
|
||||
return self._websocket.send_bytes(data)
|
||||
else:
|
||||
return self._websocket.send_text(data)
|
||||
|
||||
async def _write_audio_sleep(self):
|
||||
# Simulate a clock.
|
||||
current_time = time.monotonic()
|
||||
@@ -219,11 +270,14 @@ class FastAPIWebsocketTransport(BaseTransport):
|
||||
on_session_timeout=self._on_session_timeout,
|
||||
)
|
||||
|
||||
is_binary = self._params.serializer.type == FrameSerializerType.BINARY
|
||||
self._client = FastAPIWebsocketClient(websocket, is_binary, self._callbacks)
|
||||
|
||||
self._input = FastAPIWebsocketInputTransport(
|
||||
websocket, self._params, self._callbacks, name=self._input_name
|
||||
self._client, self._params, name=self._input_name
|
||||
)
|
||||
self._output = FastAPIWebsocketOutputTransport(
|
||||
websocket, self._params, name=self._output_name
|
||||
self._client, self._params, name=self._output_name
|
||||
)
|
||||
|
||||
# Register supported handlers. The user will only be able to register
|
||||
|
||||
@@ -329,6 +329,10 @@ class DailyTransportClient(EventHandler):
|
||||
def _speaker_name(self):
|
||||
return f"speaker-{self}"
|
||||
|
||||
@property
|
||||
def room_url(self) -> str:
|
||||
return self._room_url
|
||||
|
||||
@property
|
||||
def participant_id(self) -> str:
|
||||
return self._participant_id
|
||||
@@ -1112,6 +1116,10 @@ class DailyTransport(BaseTransport):
|
||||
# DailyTransport
|
||||
#
|
||||
|
||||
@property
|
||||
def room_url(self) -> str:
|
||||
return self._client.room_url
|
||||
|
||||
@property
|
||||
def participant_id(self) -> str:
|
||||
return self._client.participant_id
|
||||
|
||||
27
src/pipecat/utils/network.py
Normal file
27
src/pipecat/utils/network.py
Normal file
@@ -0,0 +1,27 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
|
||||
def exponential_backoff_time(
|
||||
attempt: int, min_wait: float = 4, max_wait: float = 10, multiplier: float = 1
|
||||
) -> float:
|
||||
"""Calculate exponential backoff wait time.
|
||||
|
||||
Args:
|
||||
attempt: Current attempt number (1-based)
|
||||
min_wait: Minimum wait time in seconds
|
||||
max_wait: Maximum wait time in seconds
|
||||
multiplier: Base multiplier for exponential calculation
|
||||
|
||||
Returns:
|
||||
Wait time in seconds
|
||||
"""
|
||||
try:
|
||||
exp = 2 ** (attempt - 1) * multiplier
|
||||
result = max(0, min(exp, max_wait))
|
||||
return max(min_wait, result)
|
||||
except (ValueError, ArithmeticError):
|
||||
return max_wait
|
||||
@@ -13,8 +13,8 @@ ENDOFSENTENCE_PATTERN_STR = r"""
|
||||
(?<!Mr|Ms|Dr) # Negative lookbehind: not preceded by Mr, Ms, Dr (combined bc. length is the same)
|
||||
(?<!Mrs) # Negative lookbehind: not preceded by "Mrs"
|
||||
(?<!Prof) # Negative lookbehind: not preceded by "Prof"
|
||||
[\.\?\!;]| # Match a period, question mark, exclamation point, or semicolon
|
||||
[。?!;।] # the full-width version (mainly used in East Asian languages such as Chinese, Hindi)
|
||||
(\.\s*\.\s*\.|[\.\?\!;])| # Match a period, question mark, exclamation point, or semicolon
|
||||
(\。\s*\。\s*\。|[。?!;।]) # the full-width version (mainly used in East Asian languages such as Chinese, Hindi)
|
||||
$ # End of string
|
||||
"""
|
||||
ENDOFSENTENCE_PATTERN = re.compile(ENDOFSENTENCE_PATTERN_STR, re.VERBOSE)
|
||||
|
||||
@@ -2,6 +2,7 @@ aiohttp~=3.10.3
|
||||
anthropic~=0.30.0
|
||||
azure-cognitiveservices-speech~=1.40.0
|
||||
boto3~=1.35.27
|
||||
cartesia~=1.3.1
|
||||
daily-python~=0.11.0
|
||||
deepgram-sdk~=3.5.0
|
||||
fal-client~=0.4.1
|
||||
@@ -15,6 +16,7 @@ langchain~=0.2.14
|
||||
livekit~=0.13.1
|
||||
lmnt~=1.1.4
|
||||
loguru~=0.7.2
|
||||
Markdown~=3.7
|
||||
numpy~=1.26.4
|
||||
openai~=1.37.2
|
||||
openpipe~=4.24.0
|
||||
@@ -28,5 +30,4 @@ silero-vad~=5.1
|
||||
soxr~=0.5.0
|
||||
together~=1.2.7
|
||||
transformers~=4.48.0
|
||||
websockets~=13.1
|
||||
Markdown~=3.7
|
||||
websockets~=13.1
|
||||
@@ -11,12 +11,13 @@ from pipecat.frames.frames import (
|
||||
BotStoppedSpeakingFrame,
|
||||
FunctionCallInProgressFrame,
|
||||
FunctionCallResultFrame,
|
||||
InputAudioRawFrame,
|
||||
STTMuteFrame,
|
||||
UserStartedSpeakingFrame,
|
||||
UserStoppedSpeakingFrame,
|
||||
)
|
||||
from pipecat.processors.filters.stt_mute_filter import STTMuteConfig, STTMuteFilter, STTMuteStrategy
|
||||
from pipecat.tests.utils import run_test
|
||||
from pipecat.tests.utils import SleepFrame, run_test
|
||||
|
||||
|
||||
class TestSTTMuteFilter(unittest.IsolatedAsyncioTestCase):
|
||||
@@ -26,10 +27,14 @@ class TestSTTMuteFilter(unittest.IsolatedAsyncioTestCase):
|
||||
frames_to_send = [
|
||||
BotStartedSpeakingFrame(), # First bot speech starts
|
||||
UserStartedSpeakingFrame(), # Should be suppressed
|
||||
InputAudioRawFrame(
|
||||
audio=b"", sample_rate=16000, num_channels=1
|
||||
), # Should be suppressed
|
||||
UserStoppedSpeakingFrame(), # Should be suppressed
|
||||
BotStoppedSpeakingFrame(), # First bot speech ends
|
||||
BotStartedSpeakingFrame(), # Second bot speech
|
||||
UserStartedSpeakingFrame(), # Should pass through
|
||||
InputAudioRawFrame(audio=b"", sample_rate=16000, num_channels=1), # Should pass through
|
||||
UserStoppedSpeakingFrame(), # Should pass through
|
||||
BotStoppedSpeakingFrame(),
|
||||
]
|
||||
@@ -41,6 +46,7 @@ class TestSTTMuteFilter(unittest.IsolatedAsyncioTestCase):
|
||||
STTMuteFrame, # mute=False
|
||||
BotStartedSpeakingFrame,
|
||||
UserStartedSpeakingFrame, # Now passes through
|
||||
InputAudioRawFrame, # Now passes through
|
||||
UserStoppedSpeakingFrame, # Now passes through
|
||||
BotStoppedSpeakingFrame,
|
||||
]
|
||||
@@ -57,12 +63,19 @@ class TestSTTMuteFilter(unittest.IsolatedAsyncioTestCase):
|
||||
frames_to_send = [
|
||||
BotStartedSpeakingFrame(), # First speech starts
|
||||
UserStartedSpeakingFrame(), # Should be suppressed
|
||||
InputAudioRawFrame(
|
||||
audio=b"", sample_rate=16000, num_channels=1
|
||||
), # Should be suppressed
|
||||
UserStoppedSpeakingFrame(), # Should be suppressed
|
||||
BotStoppedSpeakingFrame(), # First speech ends
|
||||
UserStartedSpeakingFrame(), # Should pass through
|
||||
InputAudioRawFrame(audio=b"", sample_rate=16000, num_channels=1), # Should pass through
|
||||
UserStoppedSpeakingFrame(), # Should pass through
|
||||
BotStartedSpeakingFrame(), # Second speech starts
|
||||
UserStartedSpeakingFrame(), # Should be suppressed again
|
||||
InputAudioRawFrame(
|
||||
audio=b"", sample_rate=16000, num_channels=1
|
||||
), # Should be suppressed again
|
||||
UserStoppedSpeakingFrame(), # Should be suppressed again
|
||||
BotStoppedSpeakingFrame(), # Second speech ends
|
||||
]
|
||||
@@ -73,6 +86,7 @@ class TestSTTMuteFilter(unittest.IsolatedAsyncioTestCase):
|
||||
BotStoppedSpeakingFrame,
|
||||
STTMuteFrame, # mute=False
|
||||
UserStartedSpeakingFrame,
|
||||
InputAudioRawFrame,
|
||||
UserStoppedSpeakingFrame,
|
||||
BotStartedSpeakingFrame,
|
||||
STTMuteFrame, # mute=True
|
||||
@@ -134,15 +148,23 @@ class TestSTTMuteFilter(unittest.IsolatedAsyncioTestCase):
|
||||
|
||||
frames_to_send = [
|
||||
UserStartedSpeakingFrame(), # Should be suppressed (starts muted)
|
||||
InputAudioRawFrame(
|
||||
audio=b"", sample_rate=16000, num_channels=1
|
||||
), # Should be suppressed
|
||||
UserStoppedSpeakingFrame(), # Should be suppressed
|
||||
BotStartedSpeakingFrame(), # First bot speech
|
||||
UserStartedSpeakingFrame(), # Should be suppressed
|
||||
InputAudioRawFrame(
|
||||
audio=b"", sample_rate=16000, num_channels=1
|
||||
), # Should be suppressed
|
||||
UserStoppedSpeakingFrame(), # Should be suppressed
|
||||
BotStoppedSpeakingFrame(), # First speech ends, unmutes
|
||||
UserStartedSpeakingFrame(), # Should pass through
|
||||
InputAudioRawFrame(audio=b"", sample_rate=16000, num_channels=1), # Should pass through
|
||||
UserStoppedSpeakingFrame(), # Should pass through
|
||||
BotStartedSpeakingFrame(), # Second speech
|
||||
UserStartedSpeakingFrame(), # Should pass through
|
||||
InputAudioRawFrame(audio=b"", sample_rate=16000, num_channels=1), # Should pass through
|
||||
UserStoppedSpeakingFrame(), # Should pass through
|
||||
BotStoppedSpeakingFrame(),
|
||||
]
|
||||
@@ -153,9 +175,11 @@ class TestSTTMuteFilter(unittest.IsolatedAsyncioTestCase):
|
||||
BotStoppedSpeakingFrame,
|
||||
STTMuteFrame, # mute=False after first speech
|
||||
UserStartedSpeakingFrame,
|
||||
InputAudioRawFrame,
|
||||
UserStoppedSpeakingFrame,
|
||||
BotStartedSpeakingFrame,
|
||||
UserStartedSpeakingFrame,
|
||||
InputAudioRawFrame,
|
||||
UserStoppedSpeakingFrame,
|
||||
BotStoppedSpeakingFrame,
|
||||
]
|
||||
@@ -190,23 +214,30 @@ class TestSTTMuteFilter(unittest.IsolatedAsyncioTestCase):
|
||||
|
||||
frames_to_send = [
|
||||
UserStartedSpeakingFrame(), # Should pass through
|
||||
InputAudioRawFrame(audio=b"", sample_rate=16000, num_channels=1), # Should pass through
|
||||
UserStoppedSpeakingFrame(), # Should pass through
|
||||
BotStartedSpeakingFrame(), # Bot starts speaking
|
||||
UserStartedSpeakingFrame(), # Should be suppressed
|
||||
InputAudioRawFrame(
|
||||
audio=b"", sample_rate=16000, num_channels=1
|
||||
), # Should be suppressed
|
||||
UserStoppedSpeakingFrame(), # Should be suppressed
|
||||
BotStoppedSpeakingFrame(), # Bot stops speaking
|
||||
UserStartedSpeakingFrame(), # Should pass through
|
||||
InputAudioRawFrame(audio=b"", sample_rate=16000, num_channels=1), # Should pass through
|
||||
UserStoppedSpeakingFrame(), # Should pass through
|
||||
]
|
||||
|
||||
expected_returned_frames = [
|
||||
UserStartedSpeakingFrame,
|
||||
InputAudioRawFrame,
|
||||
UserStoppedSpeakingFrame,
|
||||
BotStartedSpeakingFrame,
|
||||
STTMuteFrame, # mute=True
|
||||
BotStoppedSpeakingFrame,
|
||||
STTMuteFrame, # mute=False
|
||||
UserStartedSpeakingFrame,
|
||||
InputAudioRawFrame,
|
||||
UserStoppedSpeakingFrame,
|
||||
]
|
||||
|
||||
|
||||
34
tests/test_utils_network.py
Normal file
34
tests/test_utils_network.py
Normal file
@@ -0,0 +1,34 @@
|
||||
#
|
||||
# Copyright (c) 2024-2025 Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import unittest
|
||||
|
||||
from pipecat.utils.network import exponential_backoff_time
|
||||
|
||||
|
||||
class TestUtilsNetwork(unittest.IsolatedAsyncioTestCase):
|
||||
async def test_exponential_backoff_time(self):
|
||||
# min_wait=4, max_wait=10, multiplier=1
|
||||
assert exponential_backoff_time(attempt=1, min_wait=4, max_wait=10, multiplier=1) == 4
|
||||
assert exponential_backoff_time(attempt=2, min_wait=4, max_wait=10, multiplier=1) == 4
|
||||
assert exponential_backoff_time(attempt=3, min_wait=4, max_wait=10, multiplier=1) == 4
|
||||
assert exponential_backoff_time(attempt=4, min_wait=4, max_wait=10, multiplier=1) == 8
|
||||
assert exponential_backoff_time(attempt=5, min_wait=4, max_wait=10, multiplier=1) == 10
|
||||
assert exponential_backoff_time(attempt=6, min_wait=4, max_wait=10, multiplier=1) == 10
|
||||
# min_wait=1, max_wait=10, multiplier=1
|
||||
assert exponential_backoff_time(attempt=1, min_wait=1, max_wait=10, multiplier=1) == 1
|
||||
assert exponential_backoff_time(attempt=2, min_wait=1, max_wait=10, multiplier=1) == 2
|
||||
assert exponential_backoff_time(attempt=3, min_wait=1, max_wait=10, multiplier=1) == 4
|
||||
assert exponential_backoff_time(attempt=4, min_wait=1, max_wait=10, multiplier=1) == 8
|
||||
assert exponential_backoff_time(attempt=5, min_wait=1, max_wait=10, multiplier=1) == 10
|
||||
assert exponential_backoff_time(attempt=6, min_wait=1, max_wait=10, multiplier=1) == 10
|
||||
# min_wait=1, max_wait=20, multiplier=2
|
||||
assert exponential_backoff_time(attempt=1, min_wait=1, max_wait=20, multiplier=2) == 2
|
||||
assert exponential_backoff_time(attempt=2, min_wait=1, max_wait=20, multiplier=2) == 4
|
||||
assert exponential_backoff_time(attempt=3, min_wait=1, max_wait=20, multiplier=2) == 8
|
||||
assert exponential_backoff_time(attempt=4, min_wait=1, max_wait=20, multiplier=2) == 16
|
||||
assert exponential_backoff_time(attempt=5, min_wait=1, max_wait=20, multiplier=2) == 20
|
||||
assert exponential_backoff_time(attempt=6, min_wait=1, max_wait=20, multiplier=2) == 20
|
||||
@@ -11,10 +11,13 @@ from pipecat.utils.string import match_endofsentence
|
||||
|
||||
class TestUtilsString(unittest.IsolatedAsyncioTestCase):
|
||||
async def test_endofsentence(self):
|
||||
assert match_endofsentence("This is a sentence.")
|
||||
assert match_endofsentence("This is a sentence! ")
|
||||
assert match_endofsentence("This is a sentence?")
|
||||
assert match_endofsentence("This is a sentence;")
|
||||
assert match_endofsentence("This is a sentence.") == 19
|
||||
assert match_endofsentence("This is a sentence!") == 19
|
||||
assert match_endofsentence("This is a sentence?") == 19
|
||||
assert match_endofsentence("This is a sentence;") == 19
|
||||
assert match_endofsentence("This is a sentence...") == 21
|
||||
assert match_endofsentence("This is a sentence . . .") == 24
|
||||
assert match_endofsentence("This is a sentence. ..") == 22
|
||||
assert not match_endofsentence("This is not a sentence")
|
||||
assert not match_endofsentence("This is not a sentence,")
|
||||
assert not match_endofsentence("This is not a sentence, ")
|
||||
|
||||
Reference in New Issue
Block a user