Compare commits

...

69 Commits

Author SHA1 Message Date
macaki
713d20e4fc formatting 2025-03-12 14:47:10 -04:00
macaki
adc45bd282 [rime client] Sending over trailing space to help indicate end of utterance after a punctuation. 2025-03-12 14:46:54 -04:00
Aleix Conchillo Flaqué
fc544fa61c Merge pull request #1272 from pipecat-ai/aleix/tts-websocket-interruptions
services: fix some TTS websocket service interruption handling
2025-02-24 14:54:41 -08:00
Mark Backman
976fe95304 Merge pull request #1279 from pipecat-ai/mb/remove-open-optional-dep
Remove `openai` optional dependency from services as it's now required
2025-02-24 17:42:53 -05:00
Aleix Conchillo Flaqué
408270b647 lmnt: don't send "eof" before closing the socket 2025-02-24 14:37:37 -08:00
Mark Backman
1dfb75bc9d Merge pull request #1278 from pipecat-ai/mb/claude-3-7
Update AnthropicLLMService to use claude-3-7-sonnet-20250219 by default
2025-02-24 15:41:28 -05:00
Mark Backman
cefc2a1088 Fix test-requirements.text ordering 2025-02-24 15:06:13 -05:00
Mark Backman
3b9b9200ea Remove openai optional dependency from services as it's now required 2025-02-24 15:05:42 -05:00
Mark Backman
d6f29a0f4b Update AnthropicLLMService to use claude-3-7-sonnet-20250219 by default 2025-02-24 14:32:00 -05:00
Aleix Conchillo Flaqué
5b762d11ef Merge pull request #1228 from CarlKho-Minerva/main
Missing Cartesia~=1.3.1 → `test-requirements`
2025-02-24 08:47:41 -08:00
Aleix Conchillo Flaqué
2f3e2da6b9 Merge pull request #1259 from pipecat-ai/openai-not-optional
Since the `openai` package is used by pretty much everything in pipec…
2025-02-24 08:45:45 -08:00
allenmylath
45058d4a94 Update audio_buffer_processor.py (#1266) 2025-02-24 08:41:19 -08:00
Aleix Conchillo Flaqué
5b637bd826 services: fix some TTS websocket service interruption handling 2025-02-24 08:37:22 -08:00
Mark Backman
2d4fd7e903 Merge pull request #1274 from pipecat-ai/mb/add-ellipsis-test
Add one additional ellipsis test to test_utils_string
2025-02-23 11:26:20 -05:00
Mark Backman
b5662520aa Add one additional ellipsis test to test_utils_string 2025-02-23 11:04:24 -05:00
Aleix Conchillo Flaqué
af45c170b5 Merge pull request #1264 from pipecat-ai/aleix/add-log-observers
add initial log observers
2025-02-21 15:20:45 -08:00
Aleix Conchillo Flaqué
65f548b2ec examples(30-observer): update to use LLMLogObserver 2025-02-21 15:15:16 -08:00
Aleix Conchillo Flaqué
b29ab8c608 observers: add LLMLogObserver and TranscriptionLogObserver 2025-02-21 15:15:16 -08:00
Aleix Conchillo Flaqué
d6dc37f0b6 Merge pull request #1269 from pipecat-ai/aleix/endofsentence-support-ellipses
utils: add support for ellipses in match_endofsentence()
2025-02-21 15:08:22 -08:00
Aleix Conchillo Flaqué
12bce2e8c0 utils: add support for ellipses in match_endofsentence() 2025-02-21 15:05:50 -08:00
Aleix Conchillo Flaqué
4acf7296e0 Merge pull request #1261 from pipecat-ai/aleix/emualted-frames-being-triggered-prematurely
LLMUserContextAggregator: don't reset timer with interim transcription
2025-02-21 10:15:28 -08:00
Aleix Conchillo Flaqué
98706d429c LLMUserContextAggregator: make sure incoming transcription has text 2025-02-21 10:12:54 -08:00
Aleix Conchillo Flaqué
41720b1a13 LLMUserContextAggregator: don't reset timer with interim transcription
It turns out that in some cases we only get interim transcriptions (e.g. someone
is speaking very very softly or someone is talking in the background). In those
cases we don't want to interrupt the bot because there's really nothing to
interrupt the bot for.

We originally thought we should interrupt the bot right at the time we got an
interim frame, but this is causing too many false positives. It's actually
better to simply wait for a real transcription before interrupting (in case VAD
didn't interrupt).
2025-02-21 09:05:56 -08:00
Aleix Conchillo Flaqué
3ef4245166 Merge pull request #1265 from pipecat-ai/aleix/transport-remove-audio-out-is-live 2025-02-21 06:51:09 -08:00
Filipi da Silva Fuchter
3bb0797922 Merge pull request #1257 from pipecat-ai/fastapi_disconnect_issue
Fixed an issue where FastAPI was not triggering on_client_disconnected.
2025-02-21 09:15:15 -03:00
Filipi Fuchter
7c7b4c52af Fixed an issue where EndTaskFrame was not triggering on_client_disconnected or closing the WebSocket in FastAPI. 2025-02-21 09:11:58 -03:00
Aleix Conchillo Flaqué
01f083b7fc transports: remove TransportParams.audio_out_is_live 2025-02-20 23:33:06 -08:00
Aleix Conchillo Flaqué
91fcaebe25 Merge pull request #1263 from Vaibhav159/vl_fix_deepgram_sample_rate_mismatch
fixing deepgram mismatch
2025-02-20 22:39:06 -08:00
Vaibhav159
9c5fe5c85e fixing deepgram mismatch 2025-02-21 09:32:40 +05:30
Aleix Conchillo Flaqué
7e5e167a4b Merge pull request #1250 from pipecat-ai/aleix/context-aggregation-simulatenous-text-tools
AssistantContextAggregator: append aggregation and tools in the same turn
2025-02-20 17:32:57 -08:00
Aleix Conchillo Flaqué
d04c4b36f3 AssistantContextAggregator: append aggregation and tools in the same turn 2025-02-20 17:29:43 -08:00
Aleix Conchillo Flaqué
a811e53626 Merge pull request #1253 from pipecat-ai/aleix/http-tts-services-stopped-frame
HTTP TTS services stopped frame
2025-02-20 17:28:05 -08:00
Paul Kompfner
df57202a05 Since the openai package is used by pretty much everything in pipecat (due to OpenAILLMContext being the standard context representation), let's make it a non-optional dependency.
This change solves an issue faced by users who aren't intending to use OpenAI getting scary error messages saying that they need the `openai` optional dependency "in order to use OpenAI", along with an instruction to set the OPENAI_API_KEY environment variable.

Note that with this change we could theoretically remove from pyproject.toml a number of defined optional dependencies that list only the `openai` package as a dependency (like `deepseek`, for example), but I didn't want to "break the API" in terms of how users install/consume pipecat and its set of built-in services.

Finally, I removed the `python-deepcompare` dependency from the `openai` optional dependency, since it appears to me like it was added by mistake (my guess is it was used for debugging during development and then never removed).
2025-02-20 15:21:35 -05:00
Aleix Conchillo Flaqué
69e6f3fdb7 rime: pass aiohttp session to constructor 2025-02-20 07:36:24 -08:00
Aleix Conchillo Flaqué
6809254963 tts: fix metrics and TTSStoppedFrame frame in HTTP services
Fixes #1247
2025-02-20 07:36:21 -08:00
Aleix Conchillo Flaqué
81093d3bed Merge pull request #1252 from pipecat-ai/aleix/remove-vad-extra-logging
BaseInputTransport: remove VAD logging
2025-02-20 07:32:20 -08:00
Aleix Conchillo Flaqué
d9a67164f6 Merge pull request #1251 from pipecat-ai/aleix/fish-tts-service-push-stop-frame
FishAudioTTSService should push TTSStoppedFrame
2025-02-20 07:32:05 -08:00
Aleix Conchillo Flaqué
98259af54e update CHANGELOG 2025-02-19 22:05:48 -08:00
Dominic Stewart
039d144c79 examples(phone-bot): updated example to use Gemini (#1233) 2025-02-19 22:03:37 -08:00
Aleix Conchillo Flaqué
d0f67fc189 BaseInputTransport: remove VAD logging
These logs are very verbose. They were added to try to find an issue that
resulted in being because of low CPU/memory resources, but these logs were not
helpful to determine that.
2025-02-19 21:55:11 -08:00
Aleix Conchillo Flaqué
6e3f96aa83 fish: automatically send TTSStoppedFrame after timeout 2025-02-19 21:41:18 -08:00
Aleix Conchillo Flaqué
293677588d tts: make push_stop_frames default to 2.0s 2025-02-19 21:39:00 -08:00
Filipi da Silva Fuchter
77e777b1ce Merge pull request #1249 from pipecat-ai/invoking_call_start_function
Fixed an issue that `start_callback` was not invoked for some LLM services
2025-02-19 18:09:00 -03:00
Filipi Fuchter
7e7926059c Fixed an issue that start_callback was not invoked for some LLM services. 2025-02-19 18:04:20 -03:00
Aleix Conchillo Flaqué
c948754eff Merge pull request #1248 from pipecat-ai/aleix/daily-transport-room-url
daily: add room_url property
2025-02-19 09:46:46 -08:00
Aleix Conchillo Flaqué
83f1a8830d daily: add room_url property 2025-02-19 09:29:53 -08:00
James Hush
80f8e05fcf docs: fix transcripts in translation chatbot example (#1199) 2025-02-19 16:07:22 +08:00
Aleix Conchillo Flaqué
afd1a1e80b Merge pull request #1245 from pipecat-ai/aleix/stt-mute-filter-trace-logging 2025-02-18 21:21:55 -08:00
Aleix Conchillo Flaqué
84ac88cad7 STTMuteFilter: change suppressed logging to trace 2025-02-18 18:03:37 -08:00
Aleix Conchillo Flaqué
211163e5c7 Merge pull request #1241 from pipecat-ai/aleix/deepgram-nova-3
deepgram: use the new nova-3 model as default
2025-02-18 17:53:04 -08:00
Aleix Conchillo Flaqué
1b0bcebef6 deepgram: use the new nova-3 model as default 2025-02-18 17:51:54 -08:00
Aleix Conchillo Flaqué
89736b03c4 Merge pull request #1243 from pipecat-ai/aleix/add-deepgram-addons
deepgram: add ability to provide custom addons
2025-02-18 17:47:48 -08:00
Aleix Conchillo Flaqué
4edda718ed deepgram: add ability to provide custom addons 2025-02-18 17:45:41 -08:00
Aleix Conchillo Flaqué
22a62edc9e Merge pull request #1242 from pipecat-ai/aleix/utils-network-exponential
network: added exponential_backoff_time() function
2025-02-18 17:44:21 -08:00
Aleix Conchillo Flaqué
50b6cc8135 network: added exponential_backoff_time() function 2025-02-18 17:42:43 -08:00
Aleix Conchillo Flaqué
45cf36925a Merge pull request #1240 from pipecat-ai/aleix/handle-deepgram-on-error
deepgram: handle error event and reconnect
2025-02-18 17:41:29 -08:00
Filipi da Silva Fuchter
83a71e1fec Merge pull request #1112 from pipecat-ai/bot-ready-signalling-rn
React Native client for the bot ready example.
2025-02-18 15:17:38 -03:00
Filipi Fuchter
e809c8680e Upgrading to use the latest node stable version 2025-02-18 15:12:44 -03:00
Aleix Conchillo Flaqué
c926063d74 deepgram: handle error event and reconnect 2025-02-18 09:52:18 -08:00
Aleix Conchillo Flaqué
0334550356 Merge pull request #1238 from pipecat-ai/aleix/stt-mute-filter-ignore-input-audio-frames
STTMuteFilter: ignore audio frames so no transcriptions are generated
2025-02-18 09:48:13 -08:00
Aleix Conchillo Flaqué
90b9dce710 STTMuteFilter: ignore audio frames so no transcriptions are generated 2025-02-17 19:59:05 -08:00
Carl Kho
a5cdd5f1b8 Add Cartesia API key to dot-env.template 2025-02-14 21:29:37 -08:00
Carl Kho
5f937b8479 Update test requirements to include Cartesia version 1.3.1 2025-02-14 21:14:32 -08:00
Filipi Fuchter
7e3e126730 Migrating the base API URL for the react native example to an .env file. 2025-01-30 10:42:16 -03:00
Filipi Fuchter
75ca0571bb Improving the layout from the bot ready react native demo. 2025-01-30 10:31:04 -03:00
Filipi Fuchter
a48e5d0714 Only sending the message when it is a remote audio track. 2025-01-30 10:14:37 -03:00
Filipi Fuchter
2b6a992207 Sending the app-message to start playing audio once the track has started. 2025-01-30 09:37:33 -03:00
Filipi Fuchter
24cf106ed2 Refactoring the code to ask for the room that it should connect. 2025-01-30 09:14:18 -03:00
Filipi Fuchter
95c8346cb5 Starting to create a react native client for the bot ready example. 2025-01-29 19:00:42 -03:00
65 changed files with 12352 additions and 358 deletions

15
.gitignore vendored
View File

@@ -32,6 +32,21 @@ fly.toml
# Example files
pipecat/examples/twilio-chatbot/templates/streams.xml
pipecat/examples/bot-ready-signalling/client/react-native/node_modules/
pipecat/examples/bot-ready-signalling/client/react-native/.expo/
pipecat/examples/bot-ready-signalling/client/react-native/dist/
pipecat/examples/bot-ready-signalling/client/react-native/npm-debug.*
pipecat/examples/bot-ready-signalling/client/react-native/*.jks
pipecat/examples/bot-ready-signalling/client/react-native/*.p8
pipecat/examples/bot-ready-signalling/client/react-native/*.p12
pipecat/examples/bot-ready-signalling/client/react-native/*.key
pipecat/examples/bot-ready-signalling/client/react-native/*.mobileprovision
pipecat/examples/bot-ready-signalling/client/react-native/*.orig.*
pipecat/examples/bot-ready-signalling/client/react-native/web-build/
# macOS
.DS_Store
# Documentation
docs/api/_build/

View File

@@ -5,6 +5,82 @@ All notable changes to **Pipecat** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## Unreleased
### Added
- Added new log observers `LLMLogObserver` and `TranscriptionLogObserver` that
can be useful for debugging your pipelines.
- Added `room_url` property to `DailyTransport`.
- Added `addons` argument to `DeepgramSTTService`.
- Added `exponential_backoff_time()` to `utils.network` module.
### Changed
- `AnthropicLLMService` now uses `claude-3-7-sonnet-20250219` as the default
model.
- `RimeHttpTTSService` needs an `aiohttp.ClientSession` to be passed to the
constructor as all the other HTTP-based services.
- `RimeHttpTTSService` doesn't use a default voice anymore.
- `DeepgramSTTService` now uses the new `nova-3` model by default. If you want
to use the previous model you can pass `LiveOptions(model="nova-2-general")`.
(see https://deepgram.com/learn/introducing-nova-3-speech-to-text-api)
```python
stt = DeepgramSTTService(..., live_options=LiveOptions(model="nova-2-general"))
```
### Removed
- Remove `TransportParams.audio_out_is_live` since it was not being used at all.
### Fixed
- Fixed a `ElevenLabsTTSService`, `FishAudioTTSService`, `LMNTTTSService` and
`PlayHTTTSService` issue that was resulting in audio requested before an
interruption being played after an interruption.
- Fixed `match_endofsentence` support for ellipses.
- Fixed an issue that would cause undesired interruptions via
`EmulateUserStartedSpeakingFrame` when only interim transcriptions (i.e. no
final transcriptions) where received.
- Fixed an issue where `EndTaskFrame` was not triggering
`on_client_disconnected` or closing the WebSocket in FastAPI.
- Fixed an issue in `DeepgramSTTService` where the `sample_rate` passed to the
`LiveOptions` was not being used, causing the service to use the default
sample rate of pipeline.
- Fixed a context aggregator issue that would not append the LLM text response
to the context if a function call happened in the same LLM turn.
- Fixed an issue that was causing HTTP TTS services to push `TTSStoppedFrame`
more than once.
- Fixed a `FishAudioTTSService` issue where `TTSStoppedFrame` was not being
pushed.
- Fixed an issue that `start_callback` was not invoked for some LLM services.
- Fixed an issue that would cause `DeepgramSTTService` to stop working after an
error occurred (e.g. sudden network loss). If the network recovered we would
not reconnect.
- Fixed a `STTMuteFilter` issue that would not mute user audio frames causing
transcriptions to be generated by the STT service.
### Other
- Added Gemini support to `examples/phone-chatbot`.
## [0.0.57] - 2025-02-14
### Added

View File

@@ -18,6 +18,9 @@ AZURE_DALLE_API_KEY=...
AZURE_DALLE_ENDPOINT=https://...
AZURE_DALLE_MODEL=...
# Cartesia
CARTESIA_API_KEY=...
# Daily
DAILY_API_KEY=...
DAILY_SAMPLE_ROOM_URL=https://...

View File

@@ -0,0 +1 @@
22.14

View File

@@ -0,0 +1,60 @@
# React Native Implementation
Basic implementation using the [Pipecat React Native SDK](https://docs.pipecat.ai/client/react-native/introduction).
## Usage
### Expo requirements
This project cannot be used with an [Expo Go](https://docs.expo.dev/workflow/expo-go/) app because [it requires custom native code](https://docs.expo.io/workflow/customizing/).
When a project requires custom native code or a config plugin, we need to transition from using [Expo Go](https://docs.expo.dev/workflow/expo-go/)
to a [development build](https://docs.expo.dev/development/introduction/).
More details about the custom native code used by this demo can be found in [rn-daily-js-expo-config-plugin](https://github.com/daily-co/rn-daily-js-expo-config-plugin).
### Building remotely
If you do not have experience with Xcode and Android Studio builds or do not have them installed locally on your computer, you will need to follow [this guide from Expo to use EAS Build](https://docs.expo.dev/development/create-development-builds/#create-and-install-eas-build).
### Building locally
You will need to have installed locally on your computer:
- [Xcode](https://developer.apple.com/xcode/) to build for iOS;
- [Android Studio](https://developer.android.com/studio) to build for Android;
#### Install the demo dependencies
```bash
# Use the version of node specified in .nvmrc
nvm i
# Install dependencies
npm i
# Before a native app can be compiled, the native source code must be generated.
npx expo prebuild
# Configure the environment variable to connect to the local server
cp env.example .env
# edit .env and add your local ip address, for example: http://192.168.1.16:7860
```
#### Running on Android
After plugging in an Android device [configured for debugging](https://developer.android.com/studio/debug/dev-options), run the following command:
```
npm run android
```
#### Running on iOS
Run the following command:
```
npm run ios
```
#### Connect to the server
Use the http://localhost:5173 in your app.

View File

@@ -0,0 +1,75 @@
{
"expo": {
"name": "bot-ready-rn",
"slug": "bot-ready-rn",
"version": "1.0.0",
"orientation": "portrait",
"icon": "./assets/icon.png",
"userInterfaceStyle": "light",
"splash": {
"image": "./assets/splash.png",
"resizeMode": "contain",
"backgroundColor": "#ffffff"
},
"updates": {
"fallbackToCacheTimeout": 0
},
"assetBundlePatterns": [
"**/*"
],
"ios": {
"supportsTablet": true,
"bitcode": false,
"bundleIdentifier": "co.daily.expo.BotReady",
"infoPlist": {
"UIBackgroundModes": [
"voip"
]
},
"appleTeamId": "EEBGKV9N3N"
},
"android": {
"adaptiveIcon": {
"foregroundImage": "./assets/adaptive-icon.png",
"backgroundColor": "#FFFFFF"
},
"package": "co.daily.expo.BotReady",
"permissions": [
"android.permission.ACCESS_NETWORK_STATE",
"android.permission.BLUETOOTH",
"android.permission.CAMERA",
"android.permission.INTERNET",
"android.permission.MODIFY_AUDIO_SETTINGS",
"android.permission.RECORD_AUDIO",
"android.permission.SYSTEM_ALERT_WINDOW",
"android.permission.WAKE_LOCK",
"android.permission.FOREGROUND_SERVICE",
"android.permission.FOREGROUND_SERVICE_CAMERA",
"android.permission.FOREGROUND_SERVICE_MICROPHONE",
"android.permission.FOREGROUND_SERVICE_MEDIA_PROJECTION",
"android.permission.POST_NOTIFICATIONS"
]
},
"web": {
"favicon": "./assets/favicon.png"
},
"plugins": [
"@config-plugins/react-native-webrtc",
"@daily-co/config-plugin-rn-daily-js",
[
"expo-build-properties",
{
"android": {
"minSdkVersion": 24,
"compileSdkVersion": 35,
"targetSdkVersion": 34,
"buildToolsVersion": "35.0.0"
},
"ios": {
"deploymentTarget": "15.1"
}
}
]
]
}
}

Binary file not shown.

After

Width:  |  Height:  |  Size: 17 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.4 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 22 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 46 KiB

View File

@@ -0,0 +1,7 @@
module.exports = function(api) {
api.cache(true);
return {
presets: ['babel-preset-expo'],
plugins: [["module:react-native-dotenv"]],
};
};

View File

@@ -0,0 +1 @@
API_BASE_URL=http://YOUR_LOCAL_IP:7860

View File

@@ -0,0 +1,7 @@
import { registerRootComponent } from "expo";
import App from "./src/App";
// registerRootComponent calls AppRegistry.registerComponent('main', () => App);
// It also ensures that the environment is set up appropriately
registerRootComponent(App);

View File

@@ -0,0 +1,4 @@
// Learn more https://docs.expo.io/guides/customizing-metro
const { getDefaultConfig } = require('expo/metro-config');
module.exports = getDefaultConfig(__dirname);

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,31 @@
{
"name": "bot-ready-rn",
"version": "1.0.0",
"scripts": {
"start": "expo start --dev-client",
"android": "expo run:android --device",
"ios": "expo run:ios --device",
"web": "expo start --web"
},
"dependencies": {
"@config-plugins/react-native-webrtc": "^10.0.0",
"@daily-co/config-plugin-rn-daily-js": "0.0.7",
"@daily-co/react-native-daily-js": "^0.70.0",
"@daily-co/react-native-webrtc": "^118.0.3-daily.2",
"@react-native-async-storage/async-storage": "1.23.1",
"expo": "^52.0.0",
"expo-build-properties": "~0.13.1",
"expo-dev-client": "~5.0.5",
"expo-splash-screen": "~0.29.16",
"expo-status-bar": "~2.0.0",
"react": "18.3.1",
"react-native": "0.76.3",
"react-native-background-timer": "^2.4.1",
"react-native-dotenv": "^3.4.11",
"react-native-get-random-values": "^1.11.0"
},
"devDependencies": {
"@babel/core": "^7.12.9"
},
"private": true
}

View File

@@ -0,0 +1,121 @@
import React, { useState, useEffect } from 'react';
import {SafeAreaView, View, Text, Button, StyleSheet, ScrollView} from 'react-native';
import Daily from "@daily-co/react-native-daily-js";
import { API_BASE_URL } from "@env";
const CallScreen = () => {
const [connectionStatus, setConnectionStatus] = useState('Disconnected');
const [isConnected, setIsConnected] = useState(false);
const [callObject, setCallObject] = useState(null);
const [logs, setLogs] = useState([]);
useEffect(() => {
if (callObject) {
setupTrackListeners(callObject);
}
}, [callObject]);
const log = (message) => {
setLogs((prevLogs) => [...prevLogs, `${new Date().toISOString()} - ${message}`]);
console.log(message);
};
const setupTrackListeners = (callObject) => {
callObject.on("joined-meeting", () => {
setConnectionStatus('Connected');
setIsConnected(true);
log('Client connected');
});
callObject.on("left-meeting", () => {
setConnectionStatus('Disconnected');
setIsConnected(false);
log('Client disconnected');
});
callObject.on("participant-left", () => {
// When the bot leaves, we are also disconnecting from the call
disconnect().catch((err) => {
log(`Failed to disconnect ${err}`);
})
});
// Trigger so the bot can start sending audio
callObject.on("track-started", (evt) => {
if (evt.track.kind === "audio" && evt.participant.local === false) {
handleEventToConsole(evt)
log("Sending the message that will trigger the bot to play the audio.")
callObject.sendAppMessage("playable")
}
});
callObject.on("error", (evt) => log(`Error: ${evt.error}`));
// Other events just for awareness
callObject.on("track-stopped", handleEventToConsole);
callObject.on("participant-joined", handleEventToConsole);
callObject.on("participant-updated", handleEventToConsole);
};
const handleEventToConsole = (evt) => {
log(`Received event: ${evt.action}`);
};
const connect = async () => {
try {
const callObject = Daily.createCallObject({ subscribeToTracksAutomatically: true });
setCallObject(callObject);
const connectionUrl = `${API_BASE_URL}/connect`
const res = await fetch(connectionUrl, { method: "POST", headers: { "Content-Type": "application/json" } });
const roomInfo = await res.json();
await callObject.join({ url: roomInfo.room_url });
} catch (error) {
log(`Error connecting: ${error.message}`);
}
};
const disconnect = async () => {
if (callObject) {
try {
await callObject.leave();
await callObject.destroy();
setCallObject(null);
} catch (error) {
log(`Error disconnecting: ${error.message}`);
}
}
};
return (
<SafeAreaView style={styles.safeArea}>
<View style={styles.container}>
<View style={styles.statusBar}>
<Text>Status: <Text style={styles.status}>{connectionStatus}</Text></Text>
<View style={styles.controls}>
<Button
title={isConnected ? "Disconnect" : "Connect"}
onPress={isConnected ? disconnect : connect}
/>
</View>
</View>
<View style={styles.debugPanel}>
<Text style={styles.debugTitle}>Debug Info</Text>
<ScrollView style={styles.debugLog}>
{logs.map((logEntry, index) => (
<Text key={index} style={styles.logText}>{logEntry}</Text>
))}
</ScrollView>
</View>
</View>
</SafeAreaView>
);
};
const styles = StyleSheet.create({
safeArea: { flex: 1, backgroundColor: '#f0f0f0', padding: 20 },
container: { flex: 1, margin: 20 },
statusBar: { flexDirection: 'row', justifyContent: 'space-between', alignItems: 'center', padding: 10, backgroundColor: '#fff', borderRadius: 8, marginBottom: 20 },
status: { fontWeight: 'bold' },
controls: { flexDirection: 'row', gap: 10 },
debugPanel: { height: '80%', backgroundColor: '#fff', borderRadius: 8, padding: 20},
debugTitle: { fontSize: 16, fontWeight: 'bold' },
debugLog: { height: '100%', overflow: 'scroll', backgroundColor: '#f8f8f8', padding: 10, borderRadius: 4, fontFamily: 'monospace', fontSize: 12, lineHeight: 1.4 },
});
export default CallScreen;

View File

@@ -14,6 +14,7 @@ from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -30,6 +31,12 @@ logger.add(sys.stderr, level="DEBUG")
video_participant_id = None
async def start_fetch_weather(function_name, llm, context):
"""Push a frame to the LLM; this is handy when the LLM response might take a while."""
await llm.push_frame(TTSSpeakFrame("Let me check on that."))
logger.debug(f"Starting fetch_weather_from_api with function_name: {function_name}")
async def get_weather(function_name, tool_call_id, arguments, llm, context, result_callback):
location = arguments["location"]
await result_callback(f"The weather in {location} is currently 72 degrees and sunny.")
@@ -63,7 +70,7 @@ async def main():
)
llm = GoogleLLMService(api_key=os.getenv("GOOGLE_API_KEY"), model="gemini-2.0-flash-001")
llm.register_function("get_weather", get_weather)
llm.register_function("get_weather", get_weather, start_fetch_weather)
llm.register_function("get_image", get_image)
tools = [

View File

@@ -38,7 +38,6 @@ async def main():
"GStreamer",
DailyParams(
audio_out_enabled=True,
audio_out_is_live=True,
camera_out_enabled=True,
camera_out_width=1280,
camera_out_height=720,

View File

@@ -18,12 +18,10 @@ from pipecat.frames.frames import (
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
Frame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMTextFrame,
StartInterruptionFrame,
)
from pipecat.observers.base_observer import BaseObserver
from pipecat.observers.loggers.llm_log_observer import LLMLogObserver
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -73,38 +71,6 @@ class DebugObserver(BaseObserver):
logger.info(f"🤖 BOT STOP SPEAKING: {src} {arrow} {dst} at {time_sec:.2f}s")
class LLMLogObserver(BaseObserver):
"""Observer to log LLM activity to the console.
Logs all frame instances of:
- LLMFullResponseStartFrame (only from LLM service)
- LLMTextFrame
- LLMFullResponseEndFrame (only from LLM service)
This allows you to track when the LLM starts responding, what it generates, and when it finishes.
Log format: [LLM EVENT]: [details] at [timestamp]s
"""
async def on_push_frame(
self,
src: FrameProcessor,
dst: FrameProcessor,
frame: Frame,
direction: FrameDirection,
timestamp: int,
):
time_sec = timestamp / 1_000_000_000
# Only log start/end frames from OpenAILLMService
if isinstance(frame, (LLMFullResponseStartFrame, LLMFullResponseEndFrame)):
if isinstance(src, OpenAILLMService):
event = "START" if isinstance(frame, LLMFullResponseStartFrame) else "END"
logger.info(f"🧠 LLM {event} RESPONSE at {time_sec:.2f}s")
# Log all LLMTextFrames
elif isinstance(frame, LLMTextFrame):
logger.info(f"🧠 LLM GENERATING: {frame.text!r} at {time_sec:.2f}s")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)

View File

@@ -1,3 +1,5 @@
<!-- @format -->
<div align="center">
 <img alt="pipecat" width="300px" height="auto" src="image.png">
</div>
@@ -104,6 +106,21 @@ curl -X POST "http://localhost:7860/daily_start_bot" \
-d '{"dialoutNumber": "+18057145330", "detectVoicemail": true}'
```
### New! Using Gemini with Daily
We have introduced a new example file that uses Gemini. You can find the code within bot_daily_gemini.py.
If you want to spin up a Gemini-based bot for this demo, instead of an OpenAI-based bot, call the same properties above but on the `daily_gemini_start_bot` endpoint instead.
For example:
```shell
curl -X POST "http://localhost:7860/daily_gemini_start_bot" \ py pipecat
-H "Content-Type: application/json" \
-d '{"detectVoicemail": true}'
```
Any request body properties supported by `/daily_start_bot` (such as "detectVoicemail", "dialoutnumber", etc) can also be passed to `/daily_gemini_start_bot`. The only difference is that calling the Gemini endpoint will start a Gemini bot session.
### More information
For more configuration options, please consult [Daily's API documentation](https://docs.daily.co).

View File

@@ -98,6 +98,7 @@ async def main(
- **"Record your message after the tone."**
- **Any phrase that suggests an answering machine or voicemail.**
- **ASSUME IT IS A VOICEMAIL. DO NOT WAIT FOR MORE CONFIRMATION.**
- **IF THE CALL SAYS "PLEASE LEAVE A MESSAGE AFTER THE BEEP", WAIT FOR THE BEEP BEFORE LEAVING A MESSAGE.**
#### **Step 2: Leave a Voicemail Message**
- Immediately say:
@@ -110,7 +111,9 @@ async def main(
- If the call is answered by a human, say:
*"Oh, hello! I'm a friendly chatbot. Is there anything I can help you with?"*
- Keep responses **brief and helpful**.
- If the user no longer needs assistance, **call `terminate_call` immediately.**
- If the user no longer needs assistance, say:
*"Okay, thank you! Have a great day!"*
-**Then call `terminate_call` immediately.**
---

View File

@@ -0,0 +1,234 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import asyncio
import os
import sys
from typing import Optional
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndTaskFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_services import LLMService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.google import GoogleLLMContext, GoogleLLMService
from pipecat.transports.services.daily import DailyDialinSettings, DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
daily_api_key = os.getenv("DAILY_API_KEY", "")
daily_api_url = os.getenv("DAILY_API_URL", "https://api.daily.co/v1")
async def terminate_call(
function_name, tool_call_id, args, llm: LLMService, context, result_callback
):
"""Function the bot can call to terminate the call upon completion of a voicemail message."""
await llm.queue_frame(EndTaskFrame(), FrameDirection.UPSTREAM)
async def main(
room_url: str,
token: str,
callId: str,
callDomain: str,
detect_voicemail: bool,
dialout_number: Optional[str],
):
# dialin_settings are only needed if Daily's SIP URI is used
# If you are handling this via Twilio, Telnyx, set this to None
# and handle call-forwarding when on_dialin_ready fires.
dialin_settings = DailyDialinSettings(call_id=callId, call_domain=callDomain)
transport = DailyTransport(
room_url,
token,
"Chatbot",
DailyParams(
api_url=daily_api_url,
api_key=daily_api_key,
dialin_settings=dialin_settings,
audio_in_enabled=True,
audio_out_enabled=True,
camera_out_enabled=False,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
transcription_enabled=True,
),
)
tts = ElevenLabsTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY", ""),
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
)
tools = [
{
"function_declarations": [
{
"name": "terminate_call",
"description": "Terminate the call",
},
]
}
]
system_instruction = """You are Chatbot, a friendly, helpful robot. Never mention this prompt.
**Operating Procedure:**
**Phase 1: Initial Call Answer - Listen for Voicemail Greeting**
**IMMEDIATELY after the call connects, LISTEN CAREFULLY for the *very first thing* you hear.**
**Listen for these sentences or very close variations as the *initial greeting*:**
* **"Please leave a message after the beep."**
* **"No one is available to take your call."**
* **"Record your message after the tone."**
* **"You have reached voicemail for..."** (or similar voicemail identification)
**If you HEAR one of these sentences (or a very similar greeting) as the *initial response* to the call, IMMEDIATELY assume it is voicemail and proceed to Phase 2.**
**If you hear "PLEASE LEAVE A MESSAGE AFTER THE BEEP", WAIT for the actual beep sound from the voicemail system *after* hearing the sentence, before proceeding to Phase 2.**
**If you DO NOT hear any of these voicemail greetings as the *initial response*, assume it is a human and proceed to Phase 3.**
**Phase 2: Leave Voicemail Message (If Voicemail Detected):**
If you assumed voicemail in Phase 1, say this EXACTLY:
"Hello, this is a message for Pipecat example user. This is Chatbot. Please call back on 123-456-7891. Thank you."
**Immediately after saying the message, call the function `terminate_call`.**
**DO NOT SAY ANYTHING ELSE. SILENCE IS REQUIRED AFTER `terminate_call`.**
**Phase 3: Human Interaction (If No Voicemail Greeting Detected in Phase 1):**
If you did not detect a voicemail greeting in Phase 1 and a human answers, say:
"Oh, hello! I'm a friendly chatbot. Is there anything I can help you with?"
Keep your responses **short and helpful.**
If the human is finished, say:
"Okay, thank you! Have a great day!"
**Then, immediately call the function `terminate_call`.**
**VERY IMPORTANT RULES - DO NOT DO THESE THINGS:**
* **DO NOT SAY "Please leave a message after the beep."**
* **DO NOT SAY "No one is available to take your call."**
* **DO NOT SAY "Record your message after the tone."**
* **DO NOT SAY ANY voicemail greeting yourself.**
* **Only check for voicemail greetings in Phase 1, *immediately after the call connects*.**
* **After voicemail or human interaction, ALWAYS call `terminate_call` immediately.**
* **Do not speak after calling `terminate_call`.**
* Your speech will be audio, so use simple language without special characters.
"""
llm = GoogleLLMService(
model="models/gemini-2.0-flash-exp",
api_key=os.getenv("GOOGLE_API_KEY"),
system_instruction=system_instruction,
tools=tools,
)
llm.register_function("terminate_call", terminate_call)
context = GoogleLLMContext()
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
PipelineParams(allow_interruptions=True),
)
if dialout_number:
logger.debug("dialout number detected; doing dialout")
# Configure some handlers for dialing out
@transport.event_handler("on_joined")
async def on_joined(transport, data):
logger.debug(f"Joined; starting dialout to: {dialout_number}")
await transport.start_dialout({"phoneNumber": dialout_number})
@transport.event_handler("on_dialout_connected")
async def on_dialout_connected(transport, data):
logger.debug(f"Dial-out connected: {data}")
@transport.event_handler("on_dialout_answered")
async def on_dialout_answered(transport, data):
logger.debug(f"Dial-out answered: {data}")
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
# unlike the dialin case, for the dialout case, the caller will speak first. Presumably
# they will answer the phone and say "Hello?" Since we've captured their transcript,
# That will put a frame into the pipeline and prompt an LLM completion, which is how the
# bot will then greet the user.
elif detect_voicemail:
logger.debug("Detect voicemail example. You can test this in example in Daily Prebuilt")
# For the voicemail detection case, we do not want the bot to answer the phone. We want it to wait for the voicemail
# machine to say something like 'Leave a message after the beep', or for the user to say 'Hello?'.
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
else:
logger.debug("no dialout number; assuming dialin")
# Different handlers for dialin
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
# For the dialin case, we want the bot to answer the phone and greet the user. We
# can prompt the bot to speak by putting the context into the pipeline.
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.cancel()
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Pipecat Simple ChatBot")
parser.add_argument("-u", type=str, help="Room URL")
parser.add_argument("-t", type=str, help="Token")
parser.add_argument("-i", type=str, help="Call ID")
parser.add_argument("-d", type=str, help="Call Domain")
parser.add_argument("-v", action="store_true", help="Detect voicemail")
parser.add_argument("-o", type=str, help="Dialout number", default=None)
config = parser.parse_args()
asyncio.run(main(config.u, config.t, config.i, config.d, config.v, config.o))

View File

@@ -110,10 +110,15 @@ async def _create_daily_room(
# Spawn a new agent, and join the user session
# Note: this is mostly for demonstration purposes (refer to 'deployment' in docs)
print(f"Vendor: {vendor}")
if vendor == "daily":
bot_proc = f"python3 -m bot_daily -u {room.url} -t {token} -i {callId} -d {callDomain}{' -v' if detect_voicemail else ''}"
if dialoutNumber:
bot_proc += f" -o {dialoutNumber}"
elif vendor == "daily-gemini":
bot_proc = f"python3 -m bot_daily_gemini -u {room.url} -t {token} -i {callId} -d {callDomain}{' -v' if detect_voicemail else ''}"
if dialoutNumber:
bot_proc += f" -o {dialoutNumber}"
else:
bot_proc = f"python3 -m bot_twilio -u {room.url} -t {token} -i {callId} -s {room.config.sip_endpoint}"
@@ -201,6 +206,38 @@ async def daily_start_bot(request: Request) -> JSONResponse:
return JSONResponse({"room_url": room.url, "sipUri": room.config.sip_endpoint})
@app.post("/daily_gemini_start_bot")
async def daily_gemini_start_bot(request: Request) -> JSONResponse:
# The /daily_start_bot is invoked when a call is received on Daily's SIP URI
# daily_start_bot will create the room, put the call on hold until
# the bot and sip worker are ready. Daily will automatically
# forward the call to the SIP URi when dialin_ready fires.
# Use specified room URL, or create a new one if not specified
room_url = os.getenv("DAILY_SAMPLE_ROOM_URL", None)
# Get the dial-in properties from the request
try:
data = await request.json()
if "test" in data:
# Pass through any webhook checks
return JSONResponse({"test": True})
detect_voicemail = data.get("detectVoicemail", False)
callId = data.get("callId", None)
callDomain = data.get("callDomain", None)
dialoutNumber = data.get("dialoutNumber", None)
except Exception:
raise HTTPException(
status_code=500, detail="Missing properties 'callId', 'callDomain', or 'dialoutNumber'"
)
room: DailyRoomObject = await _create_daily_room(
room_url, callId, callDomain, dialoutNumber, "daily-gemini", detect_voicemail
)
# Grab a token for the user to join with
return JSONResponse({"room_url": room.url, "sipUri": room.config.sip_endpoint})
# ----------------- Main ----------------- #

View File

@@ -24,17 +24,15 @@ from pipecat.frames.frames import (
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.frameworks.rtvi import RTVIObserver, RTVIProcessor
from pipecat.processors.transcript_processor import TranscriptProcessor
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import (
DailyParams,
DailyTransport,
)
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
@@ -166,21 +164,32 @@ async def main():
async def on_transcript_update(processor, frame):
await transcript_handler.on_transcript_update(processor, frame)
rtvi = RTVIProcessor()
pipeline = Pipeline(
[
transport.input(),
rtvi,
stt,
transcript.user(), # User transcripts
tp,
llm,
tts,
transport.output(),
transcript.assistant(),
context_aggregator.assistant(),
transcript.assistant(), # Assistant transcripts
]
)
task = PipelineTask(pipeline)
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=False, # We don't want to interrupt the translator bot
enable_metrics=True,
enable_usage_metrics=True,
observers=[RTVIObserver(rtvi)],
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):

View File

@@ -1,4 +1,4 @@
python-dotenv
fastapi[all]
pipecat-ai[daily,openai,azure]
pipecat-ai[cartesia,daily,deepgram,openai,silero]
aiohttp

View File

@@ -33,7 +33,8 @@ dependencies = [
"pydantic~=2.10.5",
"pyloudnorm~=0.1.1",
"resampy~=0.4.3",
"soxr~=0.5.0"
"soxr~=0.5.0",
"openai~=1.59.6"
]
[project.urls]
@@ -44,11 +45,11 @@ Website = "https://pipecat.ai"
anthropic = [ "anthropic~=0.45.2" ]
assemblyai = [ "assemblyai~=0.36.0" ]
aws = [ "boto3~=1.35.99" ]
azure = [ "azure-cognitiveservices-speech~=1.42.0", "openai~=1.59.6" ]
azure = [ "azure-cognitiveservices-speech~=1.42.0"]
canonical = [ "aiofiles~=24.1.0" ]
cartesia = [ "cartesia~=1.3.1", "websockets~=13.1" ]
cerebras = [ "openai~=1.59.6" ]
deepseek = [ "openai~=1.59.6" ]
cerebras = []
deepseek = []
daily = [ "daily-python~=0.14.2" ]
deepgram = [ "deepgram-sdk~=3.8.0" ]
elevenlabs = [ "websockets~=13.1" ]
@@ -56,10 +57,10 @@ fal = [ "fal-client~=0.5.6" ]
fish = [ "ormsgpack~=1.7.0", "websockets~=13.1" ]
gladia = [ "websockets~=13.1" ]
google = [ "google-cloud-speech~=2.31.0", "google-cloud-texttospeech~=2.25.0", "google-genai~=1.2.0", "google-generativeai~=0.8.4" ]
grok = [ "openai~=1.59.6" ]
groq = [ "openai~=1.59.6" ]
grok = []
groq = []
gstreamer = [ "pygobject~=3.50.0" ]
fireworks = [ "openai~=1.59.6" ]
fireworks = []
krisp = [ "pipecat-ai-krisp~=0.3.0" ]
koala = [ "pvkoala~=2.0.3" ]
langchain = [ "langchain~=0.3.14", "langchain-community~=0.3.14", "langchain-openai~=0.3.0" ]
@@ -67,11 +68,11 @@ livekit = [ "livekit~=0.19.1", "livekit-api~=0.8.1", "tenacity~=9.0.0" ]
lmnt = [ "websockets~=13.1" ]
local = [ "pyaudio~=0.2.14" ]
moondream = [ "einops~=0.8.0", "timm~=1.0.13", "transformers~=4.48.0" ]
nim = [ "openai~=1.59.6" ]
nim = []
noisereduce = [ "noisereduce~=3.0.3" ]
openai = [ "openai~=1.59.6", "websockets~=13.1", "python-deepcompare~=2.1.0" ]
openai = [ "websockets~=13.1" ]
openpipe = [ "openpipe~=4.45.0" ]
perplexity = [ "openai~=1.59.6" ]
perplexity = []
playht = [ "pyht~=0.1.6", "websockets~=13.1" ]
rime = [ "websockets~=13.1" ]
riva = [ "nvidia-riva-client~=2.18.0" ]
@@ -79,10 +80,10 @@ sentry = [ "sentry-sdk~=2.20.0" ]
silero = [ "onnxruntime~=1.20.1" ]
simli = [ "simli-ai~=0.1.10"]
soundfile = [ "soundfile~=0.13.0" ]
together = [ "openai~=1.59.6" ]
together = []
websocket = [ "websockets~=13.1", "fastapi~=0.115.6" ]
whisper = [ "faster-whisper~=1.1.1" ]
openrouter = [ "openai~=1.59.6" ]
openrouter = []
[tool.setuptools.packages.find]
# All the following settings are optional:

View File

@@ -0,0 +1,85 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
from loguru import logger
from pipecat.frames.frames import (
Frame,
FunctionCallInProgressFrame,
FunctionCallResultFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMMessagesFrame,
LLMTextFrame,
)
from pipecat.observers.base_observer import BaseObserver
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.ai_services import LLMService
class LLMLogObserver(BaseObserver):
"""Observer to log LLM activity to the console.
Logs all frame instances (only from/to LLM service) of:
- LLMFullResponseStartFrame
- LLMFullResponseEndFrame
- LLMTextFrame
- FunctionCallInProgressFrame
- LLMMessagesFrame
- OpenAILLMContextFrame
This allows you to track when the LLM starts responding, what it generates,
and when it finishes.
"""
async def on_push_frame(
self,
src: FrameProcessor,
dst: FrameProcessor,
frame: Frame,
direction: FrameDirection,
timestamp: int,
):
if not isinstance(src, LLMService) and not isinstance(dst, LLMService):
return
time_sec = timestamp / 1_000_000_000
arrow = ""
# Log LLM start/end frames (output)
if isinstance(frame, (LLMFullResponseStartFrame, LLMFullResponseEndFrame)):
event = "START" if isinstance(frame, LLMFullResponseStartFrame) else "END"
logger.debug(f"🧠 {src} {arrow} LLM {event} RESPONSE at {time_sec:.2f}s")
# Log all LLMTextFrames (output)
elif isinstance(frame, LLMTextFrame):
logger.debug(f"🧠 {src} {arrow} LLM GENERATING: {frame.text!r} at {time_sec:.2f}s")
# Log function calling (output)
elif (
isinstance(frame, FunctionCallInProgressFrame)
and direction != FrameDirection.DOWNSTREAM
):
logger.debug(
f"🧠 {src} {arrow} LLM FUNCTION CALL ({frame.tool_call_id}): {frame.function_name!r}({frame.arguments}) at {time_sec:.2f}s"
)
# Log LLMMessagesFrame (input)
elif isinstance(frame, LLMMessagesFrame):
logger.debug(
f"🧠 {arrow} {dst} LLM MESSAGES FRAME: {frame.messages} at {time_sec:.2f}s"
)
# Log OpenAILLMContextFrame (input)
elif isinstance(frame, OpenAILLMContextFrame):
logger.debug(
f"🧠 {arrow} {dst} LLM CONTEXT FRAME: {frame.context.messages} at {time_sec:.2f}s"
)
# Log function call result (input)
elif isinstance(frame, FunctionCallResultFrame):
logger.debug(
f"🧠 {arrow} {src} LLM FUNCTION CALL RESULT ({frame.tool_call_id}): {frame.result} at {time_sec:.2f}s"
)

View File

@@ -0,0 +1,54 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
from loguru import logger
from pipecat.frames.frames import (
Frame,
InterimTranscriptionFrame,
TranscriptionFrame,
)
from pipecat.observers.base_observer import BaseObserver
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.ai_services import STTService
class TranscriptionLogObserver(BaseObserver):
"""Observer to log transcription activity to the console.
Logs all frame instances (only from STT service) of:
- TranscriptionFrame
- InterimTranscriptionFrame
This allows you to track when the LLM starts responding, what it generates,
and when it finishes.
"""
async def on_push_frame(
self,
src: FrameProcessor,
dst: FrameProcessor,
frame: Frame,
direction: FrameDirection,
timestamp: int,
):
if not isinstance(src, STTService):
return
time_sec = timestamp / 1_000_000_000
arrow = ""
if isinstance(frame, TranscriptionFrame):
logger.debug(
f"💬 {src} {arrow} TRANSCRIPTION: {frame.text!r} from {frame.user_id!r} at {time_sec:.2f}s"
)
elif isinstance(frame, InterimTranscriptionFrame):
logger.debug(
f"💬 {src} {arrow} INTERIM TRANSCRIPTION: {frame.text!r} from {frame.user_id!r} at {time_sec:.2f}s"
)

View File

@@ -145,6 +145,9 @@ class LLMResponseAggregator(BaseLLMResponseAggregator):
frame = LLMMessagesFrame(self._messages)
await self.push_frame(frame)
# Reset our accumulator state.
self.reset()
class LLMContextResponseAggregator(BaseLLMResponseAggregator):
"""This is a base LLM aggregator that uses an LLM context to store the
@@ -290,7 +293,13 @@ class LLMUserContextAggregator(LLMContextResponseAggregator):
await self.push_aggregation()
async def _handle_transcription(self, frame: TranscriptionFrame):
self._aggregation += f" {frame.text}" if self._aggregation else frame.text
text = frame.text
# Make sure we really have some text.
if not text.strip():
return
self._aggregation += f" {text}" if self._aggregation else text
# We just got a final result, so let's reset interim results.
self._seen_interim_results = False
# Reset aggregation timer.
@@ -298,8 +307,6 @@ class LLMUserContextAggregator(LLMContextResponseAggregator):
async def _handle_interim_transcription(self, _: InterimTranscriptionFrame):
self._seen_interim_results = True
# Reset aggregation timer.
self._aggregation_event.set()
def _create_aggregation_task(self):
self._aggregation_task = self.create_task(self._aggregation_task_handler())

View File

@@ -12,6 +12,12 @@ from dataclasses import dataclass
from typing import Any, Awaitable, Callable, List, Optional
from loguru import logger
from openai._types import NOT_GIVEN, NotGiven
from openai.types.chat import (
ChatCompletionMessageParam,
ChatCompletionToolChoiceOptionParam,
ChatCompletionToolParam,
)
from PIL import Image
from pipecat.frames.frames import (
@@ -22,20 +28,6 @@ from pipecat.frames.frames import (
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
try:
from openai._types import NOT_GIVEN, NotGiven
from openai.types.chat import (
ChatCompletionMessageParam,
ChatCompletionToolChoiceOptionParam,
ChatCompletionToolParam,
)
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
"In order to use OpenAI, you need to `pip install pipecat-ai[openai]`. Also, set `OPENAI_API_KEY` environment variable."
)
raise Exception(f"Missing module: {e}")
# JSON custom encoder to handle bytes arrays so that we can log contexts
# with images to the console.

View File

@@ -22,9 +22,8 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
class AudioBufferProcessor(FrameProcessor):
"""This processor buffers audio raw frames (input and output). The mixed
audio can be obtained by calling `get_audio()` (if `buffer_size` is 0) or by
registering an "on_audio_data" event handler. The event handler will be
called every time `buffer_size` is reached.
audio can be obtained by registering an "on_audio_data" event handler.
The event handler will be called every time `buffer_size` is reached.
You can provide the desired output `sample_rate` and incoming audio frames
will resampled to match it. Also, you can provide the number of channels, 1

View File

@@ -23,6 +23,7 @@ from pipecat.frames.frames import (
Frame,
FunctionCallInProgressFrame,
FunctionCallResultFrame,
InputAudioRawFrame,
StartFrame,
StartInterruptionFrame,
StopInterruptionFrame,
@@ -185,13 +186,14 @@ class STTMuteFilter(FrameProcessor):
StopInterruptionFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
InputAudioRawFrame,
),
):
# Only pass VAD-related frames when not muted
if not self.is_muted:
await self.push_frame(frame, direction)
else:
logger.debug(f"{frame.__class__.__name__} suppressed - STT currently muted")
logger.trace(f"{frame.__class__.__name__} suppressed - STT currently muted")
else:
# Pass all other frames through
await self.push_frame(frame, direction)

View File

@@ -15,6 +15,7 @@ from loguru import logger
from pipecat.audio.utils import calculate_audio_volume, exp_smoothing
from pipecat.frames.frames import (
AudioRawFrame,
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
CancelFrame,
EndFrame,
@@ -40,6 +41,7 @@ from pipecat.frames.frames import (
from pipecat.metrics.metrics import MetricsData
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.websocket_service import WebsocketService
from pipecat.transcriptions.language import Language
from pipecat.utils.string import match_endofsentence
from pipecat.utils.text.base_text_filter import BaseTextFilter
@@ -175,6 +177,7 @@ class LLMService(AIService):
f = self._callbacks[None]
else:
return None
await self.call_start_function(context, function_name)
await context.call_function(
f,
function_name=function_name,
@@ -208,7 +211,7 @@ class TTSService(AIService):
# if True, TTSService will push TTSStoppedFrames, otherwise subclass must do it
push_stop_frames: bool = False,
# if push_stop_frames is True, wait for this idle period before pushing TTSStoppedFrame
stop_frame_timeout_s: float = 1.0,
stop_frame_timeout_s: float = 2.0,
# if True, TTSService will push silence audio frames after TTSStoppedFrame
push_silence_after_stop: bool = False,
# if push_silence_after_stop is True, send this amount of audio silence
@@ -433,6 +436,12 @@ class TTSService(AIService):
class WordTTSService(TTSService):
"""This is a base class for TTS services that support word timestamps. Word
timestamps are useful to synchronize audio with text of the spoken
words. This way only the spoken words are added to the conversation context.
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._initial_word_timestamp = -1
@@ -502,11 +511,93 @@ class WordTTSService(TTSService):
self._words_queue.task_done()
class AudioContextWordTTSService(WordTTSService):
"""This services allow us to send multiple TTS request to the services. Each
request could be multiple sentences long which are grouped by context. For
this to work, the TTS service needs to support handling multiple requests at
once (i.e. multiple simultaneous contexts).
class WebsocketTTSService(TTSService, WebsocketService):
"""This is a base class for websocket-based TTS services."""
def __init__(self, **kwargs):
TTSService.__init__(self, **kwargs)
WebsocketService.__init__(self)
class InterruptibleTTSService(WebsocketTTSService):
"""This is a base class for websocket-based TTS services that don't support
word timestamps and that don't offer a way to correlate the generated audio
to the requested text.
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
# Indicates if the bot is speaking. If the bot is not speaking we don't
# need to reconnect when the user speaks. If the bot is speaking and the
# user interrupts we need to reconnect.
self._bot_speaking = False
async def _handle_interruption(self, frame: StartInterruptionFrame, direction: FrameDirection):
await super()._handle_interruption(frame, direction)
if self._bot_speaking:
await self._disconnect()
await self._connect()
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, BotStartedSpeakingFrame):
self._bot_speaking = True
elif isinstance(frame, BotStoppedSpeakingFrame):
self._bot_speaking = False
class WebsocketWordTTSService(WordTTSService, WebsocketService):
"""This is a base class for websocket-based TTS services that support word
timestamps.
"""
def __init__(self, **kwargs):
WordTTSService.__init__(self, **kwargs)
WebsocketService.__init__(self)
class InterruptibleWordTTSService(WebsocketWordTTSService):
"""This is a base class for websocket-based TTS services that support word
timestamps but don't offer a way to correlate the generated audio to the
requested text.
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
# Indicates if the bot is speaking. If the bot is not speaking we don't
# need to reconnect when the user speaks. If the bot is speaking and the
# user interrupts we need to reconnect.
self._bot_speaking = False
async def _handle_interruption(self, frame: StartInterruptionFrame, direction: FrameDirection):
await super()._handle_interruption(frame, direction)
if self._bot_speaking:
await self._disconnect()
await self._connect()
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, BotStartedSpeakingFrame):
self._bot_speaking = True
elif isinstance(frame, BotStoppedSpeakingFrame):
self._bot_speaking = False
class AudioContextWordTTSService(WebsocketWordTTSService):
"""This is a base class for websocket-based TTS services that support word
timestamps and also allow correlating the generated audio with the requested
text.
Each request could be multiple sentences long which are grouped by
context. For this to work, the TTS service needs to support handling
multiple requests at once (i.e. multiple simultaneous contexts).
The audio received from the TTS will be played in context order. That is, if
we requested audio for a context "A" and then audio for context "B", the

View File

@@ -96,7 +96,7 @@ class AnthropicLLMService(LLMService):
self,
*,
api_key: str,
model: str = "claude-3-5-sonnet-20241022",
model: str = "claude-3-7-sonnet-20250219",
params: InputParams = InputParams(),
client=None,
**kwargs,
@@ -743,18 +743,19 @@ class AnthropicAssistantContextAggregator(LLMAssistantContextAggregator):
run_llm = False
properties: Optional[FunctionCallResultProperties] = None
aggregation = self._aggregation
aggregation = self._aggregation.strip()
self.reset()
try:
if aggregation:
self._context.add_message({"role": "assistant", "content": aggregation})
if self._function_call_result:
frame = self._function_call_result
properties = frame.properties
self._function_call_result = None
if frame.result:
assistant_message = {"role": "assistant", "content": []}
if aggregation:
assistant_message["content"].append({"type": "text", "text": aggregation})
assistant_message["content"].append(
{
"type": "tool_use",
@@ -782,8 +783,6 @@ class AnthropicAssistantContextAggregator(LLMAssistantContextAggregator):
else:
# Default behavior
run_llm = True
elif aggregation:
self._context.add_message({"role": "assistant", "content": aggregation})
if self._pending_image_frame_message:
frame = self._pending_image_frame_message

View File

@@ -10,6 +10,7 @@ from typing import AsyncGenerator, Optional
import aiohttp
from loguru import logger
from openai import AsyncAzureOpenAI
from PIL import Image
from pydantic import BaseModel
@@ -48,7 +49,6 @@ try:
PushAudioInputStream,
)
from azure.cognitiveservices.speech.dialog import AudioConfig
from openai import AsyncAzureOpenAI
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(

View File

@@ -7,22 +7,14 @@
from typing import AsyncGenerator, Optional
from loguru import logger
from openai import AsyncOpenAI
from openai.types.audio import Transcription
from pipecat.frames.frames import ErrorFrame, Frame, TranscriptionFrame
from pipecat.services.ai_services import SegmentedSTTService
from pipecat.transcriptions.language import Language
from pipecat.utils.time import time_now_iso8601
try:
from openai import AsyncOpenAI
from openai.types.audio import Transcription
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
"In order to use OpenAI, you need to `pip install pipecat-ai[openai]`. Also, set `OPENAI_API_KEY` environment variable."
)
raise Exception(f"Missing module: {e}")
def language_to_whisper_language(language: Language) -> Optional[str]:
"""Language support for Whisper API.

View File

@@ -13,22 +13,18 @@ from loguru import logger
from pydantic import BaseModel
from pipecat.frames.frames import (
BotStoppedSpeakingFrame,
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
LLMFullResponseEndFrame,
StartFrame,
StartInterruptionFrame,
TTSAudioRawFrame,
TTSSpeakFrame,
TTSStartedFrame,
TTSStoppedFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_services import AudioContextWordTTSService, TTSService
from pipecat.services.websocket_service import WebsocketService
from pipecat.transcriptions.language import Language
# See .env.example for Cartesia configuration needed
@@ -75,7 +71,7 @@ def language_to_cartesia_language(language: Language) -> Optional[str]:
return result
class CartesiaTTSService(AudioContextWordTTSService, WebsocketService):
class CartesiaTTSService(AudioContextWordTTSService):
class InputParams(BaseModel):
language: Optional[Language] = Language.EN
speed: Optional[Union[str, float]] = ""
@@ -105,15 +101,13 @@ class CartesiaTTSService(AudioContextWordTTSService, WebsocketService):
# if we're interrupted. Cartesia gives us word-by-word timestamps. We
# can use those to generate text frames ourselves aligned with the
# playout timing of the audio!
AudioContextWordTTSService.__init__(
self,
super().__init__(
aggregate_sentences=True,
push_text_frames=False,
pause_frame_processing=True,
sample_rate=sample_rate,
**kwargs,
)
WebsocketService.__init__(self)
self._api_key = api_key
self._cartesia_version = cartesia_version
@@ -364,9 +358,6 @@ class CartesiaHttpTTSService(TTSService):
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"Generating TTS: [{text}]")
await self.start_ttfb_metrics()
yield TTSStartedFrame()
try:
voice_controls = None
if self._settings["speed"] or self._settings["emotion"]:
@@ -376,6 +367,8 @@ class CartesiaHttpTTSService(TTSService):
if self._settings["emotion"]:
voice_controls["emotion"] = self._settings["emotion"]
await self.start_ttfb_metrics()
output = await self._client.tts.sse(
model_id=self._model_name,
transcript=text,
@@ -386,14 +379,17 @@ class CartesiaHttpTTSService(TTSService):
_experimental_voice_controls=voice_controls,
)
await self.start_tts_usage_metrics(text)
yield TTSStartedFrame()
frame = TTSAudioRawFrame(
audio=output["audio"], sample_rate=self.sample_rate, num_channels=1
)
yield frame
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.start_tts_usage_metrics(text)
await self.stop_ttfb_metrics()
yield TTSStoppedFrame()
finally:
await self.stop_ttfb_metrics()
yield TTSStoppedFrame()

View File

@@ -7,22 +7,12 @@
from typing import List
from loguru import logger
from openai import AsyncStream
from openai.types.chat import ChatCompletionChunk, ChatCompletionMessageParam
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.openai import OpenAILLMService
try:
from openai import (
AsyncStream,
)
from openai.types.chat import ChatCompletionChunk, ChatCompletionMessageParam
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
"In order to use Cerebras, you need to `pip install pipecat-ai[cerebras]`. Also, set `CEREBRAS_API_KEY` environment variable."
)
raise Exception(f"Missing module: {e}")
class CerebrasLLMService(OpenAILLMService):
"""A service for interacting with Cerebras's API using the OpenAI-compatible interface.

View File

@@ -5,7 +5,7 @@
#
import asyncio
from typing import AsyncGenerator, Optional
from typing import AsyncGenerator, Dict, Optional
from loguru import logger
@@ -34,6 +34,7 @@ try:
AsyncListenWebSocketClient,
DeepgramClient,
DeepgramClientOptions,
ErrorResponse,
LiveOptions,
LiveResultResponse,
LiveTranscriptionEvents,
@@ -120,14 +121,16 @@ class DeepgramSTTService(STTService):
url: str = "",
sample_rate: Optional[int] = None,
live_options: Optional[LiveOptions] = None,
addons: Optional[Dict] = None,
**kwargs,
):
sample_rate = sample_rate or (live_options.sample_rate if live_options else None)
super().__init__(sample_rate=sample_rate, **kwargs)
default_options = LiveOptions(
encoding="linear16",
language=Language.EN,
model="nova-2-general",
model="nova-3-general",
channels=1,
interim_results=True,
smart_format=True,
@@ -147,6 +150,7 @@ class DeepgramSTTService(STTService):
merged_options.language = merged_options.language.value
self._settings = merged_options.to_dict()
self._addons = addons
self._client = DeepgramClient(
api_key,
@@ -155,13 +159,10 @@ class DeepgramSTTService(STTService):
options={"keepalive": "true"}, # verbose=logging.DEBUG
),
)
self._connection: AsyncListenWebSocketClient = self._client.listen.asyncwebsocket.v("1")
self._connection.on(LiveTranscriptionEvents.Transcript, self._on_message)
if self.vad_enabled:
self._register_event_handler("on_speech_started")
self._register_event_handler("on_utterance_end")
self._connection.on(LiveTranscriptionEvents.SpeechStarted, self._on_speech_started)
self._connection.on(LiveTranscriptionEvents.UtteranceEnd, self._on_utterance_end)
@property
def vad_enabled(self):
@@ -202,7 +203,25 @@ class DeepgramSTTService(STTService):
async def _connect(self):
logger.debug("Connecting to Deepgram")
if not await self._connection.start(self._settings):
self._connection: AsyncListenWebSocketClient = self._client.listen.asyncwebsocket.v("1")
self._connection.on(
LiveTranscriptionEvents(LiveTranscriptionEvents.Transcript), self._on_message
)
self._connection.on(LiveTranscriptionEvents(LiveTranscriptionEvents.Error), self._on_error)
if self.vad_enabled:
self._connection.on(
LiveTranscriptionEvents(LiveTranscriptionEvents.SpeechStarted),
self._on_speech_started,
)
self._connection.on(
LiveTranscriptionEvents(LiveTranscriptionEvents.UtteranceEnd),
self._on_utterance_end,
)
if not await self._connection.start(options=self._settings, addons=self._addons):
logger.error(f"{self}: unable to connect to Deepgram")
async def _disconnect(self):
@@ -214,6 +233,15 @@ class DeepgramSTTService(STTService):
await self.start_ttfb_metrics()
await self.start_processing_metrics()
async def _on_error(self, *args, **kwargs):
error: ErrorResponse = kwargs["error"]
logger.warning(f"{self} connection error, will retry: {error}")
await self.stop_all_metrics()
# NOTE(aleix): we don't disconnect (i.e. call finish on the connection)
# because this triggers more errors internally in the Deepgram SDK. So,
# we just forget about the previous connection and create a new one.
await self._connect()
async def _on_speech_started(self, *args, **kwargs):
await self.start_metrics()
await self._call_event_handler("on_speech_started", *args, **kwargs)

View File

@@ -8,22 +8,12 @@
from typing import List
from loguru import logger
from openai import AsyncStream
from openai.types.chat import ChatCompletionChunk, ChatCompletionMessageParam
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.openai import OpenAILLMService
try:
from openai import (
AsyncStream,
)
from openai.types.chat import ChatCompletionChunk, ChatCompletionMessageParam
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
"In order to use DeepSeek, you need to `pip install pipecat-ai[deepseek]`. Also, set `DEEPSEEK_API_KEY` environment variable."
)
raise Exception(f"Missing module: {e}")
class DeepSeekLLMService(OpenAILLMService):
"""A service for interacting with DeepSeek's API using the OpenAI-compatible interface.

View File

@@ -14,22 +14,18 @@ from loguru import logger
from pydantic import BaseModel, model_validator
from pipecat.frames.frames import (
BotStoppedSpeakingFrame,
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
LLMFullResponseEndFrame,
StartFrame,
StartInterruptionFrame,
TTSAudioRawFrame,
TTSSpeakFrame,
TTSStartedFrame,
TTSStoppedFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_services import TTSService, WordTTSService
from pipecat.services.websocket_service import WebsocketService
from pipecat.services.ai_services import InterruptibleWordTTSService, TTSService
from pipecat.transcriptions.language import Language
# See .env.example for ElevenLabs configuration needed
@@ -141,7 +137,7 @@ def calculate_word_times(
return word_times
class ElevenLabsTTSService(WordTTSService, WebsocketService):
class ElevenLabsTTSService(InterruptibleWordTTSService):
class InputParams(BaseModel):
language: Optional[Language] = None
optimize_streaming_latency: Optional[str] = None
@@ -186,17 +182,14 @@ class ElevenLabsTTSService(WordTTSService, WebsocketService):
# Finally, ElevenLabs doesn't provide information on when the bot stops
# speaking for a while, so we want the parent class to send TTSStopFrame
# after a short period not receiving any audio.
WordTTSService.__init__(
self,
super().__init__(
aggregate_sentences=True,
push_text_frames=False,
push_stop_frames=True,
stop_frame_timeout_s=2.0,
pause_frame_processing=True,
sample_rate=sample_rate,
**kwargs,
)
WebsocketService.__init__(self)
self._api_key = api_key
self._url = url
@@ -567,18 +560,16 @@ class ElevenLabsHttpTTSService(TTSService):
return
await self.start_tts_usage_metrics(text)
yield TTSStartedFrame()
async for chunk in response.content:
if chunk:
await self.stop_ttfb_metrics()
yield TTSAudioRawFrame(chunk, self.sample_rate, 1)
yield TTSStoppedFrame()
except Exception as e:
logger.error(f"Error in run_tts: {e}")
yield ErrorFrame(error=str(e))
finally:
await self.stop_ttfb_metrics()
yield TTSStoppedFrame()

View File

@@ -8,19 +8,11 @@
from typing import List
from loguru import logger
from openai.types.chat import ChatCompletionMessageParam
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.openai import OpenAILLMService
try:
from openai.types.chat import ChatCompletionMessageParam
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
"In order to use Fireworks, you need to `pip install pipecat-ai[fireworks]`. Also, set `FIREWORKS_API_KEY` environment variable."
)
raise Exception(f"Missing module: {e}")
class FireworksLLMService(OpenAILLMService):
"""A service for interacting with Fireworks AI using the OpenAI-compatible interface.

View File

@@ -11,22 +11,18 @@ from loguru import logger
from pydantic import BaseModel
from pipecat.frames.frames import (
BotStoppedSpeakingFrame,
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
LLMFullResponseEndFrame,
StartFrame,
StartInterruptionFrame,
TTSAudioRawFrame,
TTSSpeakFrame,
TTSStartedFrame,
TTSStoppedFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_services import TTSService
from pipecat.services.websocket_service import WebsocketService
from pipecat.services.ai_services import InterruptibleTTSService
from pipecat.transcriptions.language import Language
try:
@@ -43,7 +39,7 @@ except ModuleNotFoundError as e:
FishAudioOutputFormat = Literal["opus", "mp3", "pcm", "wav"]
class FishAudioTTSService(TTSService, WebsocketService):
class FishAudioTTSService(InterruptibleTTSService):
class InputParams(BaseModel):
language: Optional[Language] = Language.EN
latency: Optional[str] = "normal" # "normal" or "balanced"
@@ -60,7 +56,12 @@ class FishAudioTTSService(TTSService, WebsocketService):
params: InputParams = InputParams(),
**kwargs,
):
super().__init__(pause_frame_processing=True, sample_rate=sample_rate, **kwargs)
super().__init__(
push_stop_frames=True,
pause_frame_processing=True,
sample_rate=sample_rate,
**kwargs,
)
self._api_key = api_key
self._base_url = "wss://api.fish.audio/v1/tts/live"
@@ -108,11 +109,12 @@ class FishAudioTTSService(TTSService, WebsocketService):
self._receive_task = self.create_task(self._receive_task_handler(self.push_error))
async def _disconnect(self):
await self._disconnect_websocket()
if self._receive_task:
await self.cancel_task(self._receive_task)
self._receive_task = None
await self._disconnect_websocket()
async def _connect_websocket(self):
try:
logger.debug("Connecting to Fish Audio")
@@ -147,6 +149,11 @@ class FishAudioTTSService(TTSService, WebsocketService):
return self._websocket
raise Exception("Websocket not connected")
async def _handle_interruption(self, frame: StartInterruptionFrame, direction: FrameDirection):
await super()._handle_interruption(frame, direction)
await self.stop_all_metrics()
self._request_id = None
async def _receive_messages(self):
async for message in self._get_websocket():
try:
@@ -166,11 +173,6 @@ class FishAudioTTSService(TTSService, WebsocketService):
except Exception as e:
logger.error(f"Error processing message: {e}")
async def _handle_interruption(self, frame: StartInterruptionFrame, direction: FrameDirection):
await super()._handle_interruption(frame, direction)
await self.stop_all_metrics()
self._request_id = None
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"Generating Fish TTS: [{text}]")
try:

View File

@@ -565,10 +565,15 @@ class GoogleAssistantContextAggregator(OpenAIAssistantContextAggregator):
run_llm = False
properties: Optional[FunctionCallResultProperties] = None
aggregation = self._aggregation
aggregation = self._aggregation.strip()
self.reset()
try:
if aggregation:
self._context.add_message(
glm.Content(role="model", parts=[glm.Part(text=aggregation)])
)
if self._function_call_result:
frame = self._function_call_result
properties = frame.properties
@@ -608,11 +613,6 @@ class GoogleAssistantContextAggregator(OpenAIAssistantContextAggregator):
else:
# Default behavior is to run the LLM if there are no function calls in progress
run_llm = not bool(self._function_calls_in_progress)
else:
if aggregation.strip():
self._context.add_message(
glm.Content(role="model", parts=[glm.Part(text=aggregation)])
)
if self._pending_image_frame_message:
frame = self._pending_image_frame_message

View File

@@ -37,10 +37,13 @@ class GrokAssistantContextAggregator(OpenAIAssistantContextAggregator):
run_llm = False
properties: Optional[FunctionCallResultProperties] = None
aggregation = self._aggregation
aggregation = self._aggregation.strip()
self.reset()
try:
if aggregation:
self._context.add_message({"role": "assistant", "content": aggregation})
if self._function_call_result:
frame = self._function_call_result
properties = frame.properties
@@ -77,9 +80,6 @@ class GrokAssistantContextAggregator(OpenAIAssistantContextAggregator):
# Default behavior is to run the LLM if there are no function calls in progress
run_llm = not bool(self._function_calls_in_progress)
else:
self._context.add_message({"role": "assistant", "content": aggregation})
if self._pending_image_frame_message:
frame = self._pending_image_frame_message
self._pending_image_frame_message = None

View File

@@ -21,8 +21,7 @@ from pipecat.frames.frames import (
TTSStoppedFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_services import TTSService
from pipecat.services.websocket_service import WebsocketService
from pipecat.services.ai_services import InterruptibleTTSService
from pipecat.transcriptions.language import Language
# See .env.example for LMNT configuration needed
@@ -60,7 +59,7 @@ def language_to_lmnt_language(language: Language) -> Optional[str]:
return result
class LmntTTSService(TTSService, WebsocketService):
class LmntTTSService(InterruptibleTTSService):
def __init__(
self,
*,
@@ -70,14 +69,12 @@ class LmntTTSService(TTSService, WebsocketService):
language: Language = Language.EN,
**kwargs,
):
TTSService.__init__(
self,
super().__init__(
push_stop_frames=True,
pause_frame_processing=True,
sample_rate=sample_rate,
**kwargs,
)
WebsocketService.__init__(self)
self._api_key = api_key
self._voice_id = voice_id
@@ -116,12 +113,12 @@ class LmntTTSService(TTSService, WebsocketService):
self._receive_task = self.create_task(self._receive_task_handler(self.push_error))
async def _disconnect(self):
await self._disconnect_websocket()
if self._receive_task:
await self.cancel_task(self._receive_task)
self._receive_task = None
await self._disconnect_websocket()
async def _connect_websocket(self):
"""Connect to LMNT websocket."""
try:
@@ -153,8 +150,9 @@ class LmntTTSService(TTSService, WebsocketService):
if self._websocket:
logger.debug("Disconnecting from LMNT")
# Send EOF message before closing
await self._websocket.send(json.dumps({"eof": True}))
# NOTE(aleix): sending EOF message before closing is causing
# errors on the websocket, so we just skip it for now.
# await self._websocket.send(json.dumps({"eof": True}))
await self._websocket.close()
self._websocket = None

View File

@@ -13,6 +13,14 @@ from typing import Any, AsyncGenerator, Dict, List, Literal, Optional
import aiohttp
import httpx
from loguru import logger
from openai import (
NOT_GIVEN,
AsyncOpenAI,
AsyncStream,
BadRequestError,
DefaultAsyncHttpxClient,
)
from openai.types.chat import ChatCompletionChunk, ChatCompletionMessageParam
from PIL import Image
from pydantic import BaseModel, Field
@@ -57,23 +65,6 @@ from pipecat.services.base_whisper import BaseWhisperSTTService, Transcription
from pipecat.transcriptions.language import Language
from pipecat.utils.time import time_now_iso8601
try:
from openai import (
NOT_GIVEN,
AsyncOpenAI,
AsyncStream,
BadRequestError,
DefaultAsyncHttpxClient,
)
from openai.types.chat import ChatCompletionChunk, ChatCompletionMessageParam
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
"In order to use OpenAI, you need to `pip install pipecat-ai[openai]`. Also, set `OPENAI_API_KEY` environment variable."
)
raise Exception(f"Missing module: {e}")
ValidVoice = Literal["alloy", "echo", "fable", "onyx", "nova", "shimmer"]
VALID_VOICES: Dict[str, ValidVoice] = {
@@ -266,7 +257,6 @@ class BaseOpenAILLMService(LLMService):
if tool_call.function and tool_call.function.name:
function_name += tool_call.function.name
tool_call_id = tool_call.id
await self.call_start_function(context, function_name)
if tool_call.function and tool_call.function.arguments:
# Keep iterating through the response to collect all the argument fragments
arguments += tool_call.function.arguments
@@ -632,10 +622,13 @@ class OpenAIAssistantContextAggregator(LLMAssistantContextAggregator):
run_llm = False
properties: Optional[FunctionCallResultProperties] = None
aggregation = self._aggregation
aggregation = self._aggregation.strip()
self.reset()
try:
if aggregation:
self._context.add_message({"role": "assistant", "content": aggregation})
if self._function_call_result:
frame = self._function_call_result
properties = frame.properties
@@ -670,9 +663,6 @@ class OpenAIAssistantContextAggregator(LLMAssistantContextAggregator):
# Default behavior is to run the LLM if there are no function calls in progress
run_llm = not bool(self._function_calls_in_progress)
else:
self._context.add_message({"role": "assistant", "content": aggregation})
if self._pending_image_frame_message:
frame = self._pending_image_frame_message
self._pending_image_frame_message = None

View File

@@ -10,9 +10,17 @@ import json
import time
from dataclasses import dataclass
import websockets
from loguru import logger
try:
import websockets
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
"In order to use OpenAI, you need to `pip install pipecat-ai[openai]`. Also, set `OPENAI_API_KEY` environment variable."
)
raise Exception(f"Missing module: {e}")
from pipecat.frames.frames import (
BotStoppedSpeakingFrame,
CancelFrame,

View File

@@ -7,12 +7,12 @@
from typing import Dict, List, Optional
from loguru import logger
from openai.types.chat import ChatCompletionChunk, ChatCompletionMessageParam
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.openai import OpenAILLMService
try:
from openai.types.chat import ChatCompletionChunk, ChatCompletionMessageParam
from openpipe import AsyncOpenAI as OpenPipeAI
from openpipe import AsyncStream
except ModuleNotFoundError as e:

View File

@@ -7,24 +7,13 @@
from typing import List
from loguru import logger
from openai import NOT_GIVEN, AsyncStream
from openai.types.chat import ChatCompletionChunk, ChatCompletionMessageParam
from pipecat.metrics.metrics import LLMTokenUsage
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.openai import OpenAILLMService
try:
from openai import (
NOT_GIVEN,
AsyncStream,
)
from openai.types.chat import ChatCompletionChunk, ChatCompletionMessageParam
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
"In order to use Perplexity, you need to `pip install pipecat-ai[perplexity]`. Also, set `PERPLEXITY_API_KEY` environment variable."
)
raise Exception(f"Missing module: {e}")
class PerplexityLLMService(OpenAILLMService):
"""A service for interacting with Perplexity's API.

View File

@@ -16,22 +16,18 @@ from loguru import logger
from pydantic import BaseModel
from pipecat.frames.frames import (
BotStoppedSpeakingFrame,
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
LLMFullResponseEndFrame,
StartFrame,
StartInterruptionFrame,
TTSAudioRawFrame,
TTSSpeakFrame,
TTSStartedFrame,
TTSStoppedFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_services import TTSService
from pipecat.services.websocket_service import WebsocketService
from pipecat.services.ai_services import InterruptibleTTSService, TTSService
from pipecat.transcriptions.language import Language
try:
@@ -100,7 +96,7 @@ def language_to_playht_language(language: Language) -> Optional[str]:
return result
class PlayHTTTSService(TTSService, WebsocketService):
class PlayHTTTSService(InterruptibleTTSService):
class InputParams(BaseModel):
language: Optional[Language] = Language.EN
speed: Optional[float] = 1.0
@@ -118,13 +114,11 @@ class PlayHTTTSService(TTSService, WebsocketService):
params: InputParams = InputParams(),
**kwargs,
):
TTSService.__init__(
self,
super().__init__(
pause_frame_processing=True,
sample_rate=sample_rate,
**kwargs,
)
WebsocketService.__init__(self)
self._api_key = api_key
self._user_id = user_id
@@ -168,12 +162,12 @@ class PlayHTTTSService(TTSService, WebsocketService):
self._receive_task = self.create_task(self._receive_task_handler(self.push_error))
async def _disconnect(self):
await self._disconnect_websocket()
if self._receive_task:
await self.cancel_task(self._receive_task)
self._receive_task = None
await self._disconnect_websocket()
async def _connect_websocket(self):
try:
logger.debug("Connecting to PlayHT")
@@ -397,6 +391,7 @@ class PlayHTHttpTTSService(TTSService):
await self.start_tts_usage_metrics(text)
yield TTSStartedFrame()
async for chunk in playht_gen:
# skip the RIFF header.
if in_header:
@@ -416,6 +411,8 @@ class PlayHTHttpTTSService(TTSService):
await self.stop_ttfb_metrics()
frame = TTSAudioRawFrame(chunk, self.sample_rate, 1)
yield frame
yield TTSStoppedFrame()
except Exception as e:
logger.error(f"{self} error generating TTS: {e}")
finally:
await self.stop_ttfb_metrics()
yield TTSStoppedFrame()

View File

@@ -14,22 +14,18 @@ from loguru import logger
from pydantic import BaseModel
from pipecat.frames.frames import (
BotStoppedSpeakingFrame,
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
LLMFullResponseEndFrame,
StartFrame,
StartInterruptionFrame,
TTSAudioRawFrame,
TTSSpeakFrame,
TTSStartedFrame,
TTSStoppedFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_services import AudioContextWordTTSService, TTSService
from pipecat.services.websocket_service import WebsocketService
from pipecat.transcriptions.language import Language
try:
@@ -58,7 +54,7 @@ def language_to_rime_language(language: Language) -> str:
return LANGUAGE_MAP.get(language, "eng")
class RimeTTSService(AudioContextWordTTSService, WebsocketService):
class RimeTTSService(AudioContextWordTTSService):
"""Text-to-Speech service using Rime's websocket API.
Uses Rime's websocket JSON API to convert text to speech with word-level timing
@@ -95,17 +91,14 @@ class RimeTTSService(AudioContextWordTTSService, WebsocketService):
params: Additional configuration parameters.
"""
# Initialize with parent class settings for proper frame handling
AudioContextWordTTSService.__init__(
self,
super().__init__(
aggregate_sentences=True,
push_text_frames=False,
push_stop_frames=True,
stop_frame_timeout_s=2.0,
pause_frame_processing=True,
sample_rate=sample_rate,
**kwargs,
)
WebsocketService.__init__(self)
# Store service configuration
self._api_key = api_key
@@ -176,11 +169,12 @@ class RimeTTSService(AudioContextWordTTSService, WebsocketService):
async def _disconnect(self):
"""Close websocket connection and clean up tasks."""
await self._disconnect_websocket()
if self._receive_task:
await self.cancel_task(self._receive_task)
self._receive_task = None
await self._disconnect_websocket()
async def _connect_websocket(self):
"""Connect to Rime websocket API with configured settings."""
try:
@@ -250,7 +244,9 @@ class RimeTTSService(AudioContextWordTTSService, WebsocketService):
async def flush_audio(self):
if not self._context_id or not self._websocket:
return
logger.trace(f"{self}: flushing audio")
await self._get_websocket().send(json.dumps({"text": " "}))
self._context_id = None
async def _receive_messages(self):
@@ -349,7 +345,8 @@ class RimeHttpTTSService(TTSService):
self,
*,
api_key: str,
voice_id: str = "eva",
voice_id: str,
aiohttp_session: aiohttp.ClientSession,
model: str = "mistv2",
sample_rate: Optional[int] = None,
params: InputParams = InputParams(),
@@ -358,6 +355,7 @@ class RimeHttpTTSService(TTSService):
super().__init__(sample_rate=sample_rate, **kwargs)
self._api_key = api_key
self._session = aiohttp_session
self._base_url = "https://users.rime.ai/v1/rime-tts"
self._settings = {
"speedAlpha": params.speed_alpha,
@@ -391,36 +389,31 @@ class RimeHttpTTSService(TTSService):
try:
await self.start_ttfb_metrics()
await self.start_tts_usage_metrics(text)
yield TTSStartedFrame()
async with self._session.post(
self._base_url, json=payload, headers=headers
) as response:
if response.status != 200:
error_message = f"Rime TTS error: HTTP {response.status}"
logger.error(error_message)
yield ErrorFrame(error=error_message)
return
async with aiohttp.ClientSession() as session:
async with session.post(self._base_url, json=payload, headers=headers) as response:
if response.status != 200:
error_message = f"Rime TTS error: HTTP {response.status}"
logger.error(error_message)
yield ErrorFrame(error=error_message)
return
await self.start_tts_usage_metrics(text)
# Process the streaming response
chunk_size = 8192
first_chunk = True
yield TTSStartedFrame()
async for chunk in response.content.iter_chunked(chunk_size):
if first_chunk:
await self.stop_ttfb_metrics()
first_chunk = False
if chunk:
frame = TTSAudioRawFrame(chunk, self.sample_rate, 1)
yield frame
yield TTSStoppedFrame()
# Process the streaming response
chunk_size = 8192
async for chunk in response.content.iter_chunked(chunk_size):
if chunk:
await self.stop_ttfb_metrics()
frame = TTSAudioRawFrame(chunk, self.sample_rate, 1)
yield frame
except Exception as e:
logger.exception(f"Error generating TTS: {e}")
yield ErrorFrame(error=f"Rime TTS error: {str(e)}")
finally:
await self.stop_ttfb_metrics()
yield TTSStoppedFrame()

View File

@@ -13,6 +13,7 @@ from loguru import logger
from websockets.protocol import State
from pipecat.frames.frames import ErrorFrame
from pipecat.utils.network import exponential_backoff_time
class WebsocketService(ABC):
@@ -51,27 +52,6 @@ class WebsocketService(ABC):
await self._connect_websocket()
return await self._verify_connection()
def _calculate_wait_time(
self, attempt: int, min_wait: float = 4, max_wait: float = 10, multiplier: float = 1
) -> float:
"""Calculate exponential backoff wait time.
Args:
attempt: Current attempt number (1-based)
min_wait: Minimum wait time in seconds
max_wait: Maximum wait time in seconds
multiplier: Base multiplier for exponential calculation
Returns:
Wait time in seconds
"""
try:
exp = 2 ** (attempt - 1) * multiplier
result = max(0, min(exp, max_wait))
return max(min_wait, result)
except (ValueError, ArithmeticError):
return max_wait
async def _receive_task_handler(self, report_error: Callable[[ErrorFrame], Awaitable[None]]):
"""Handles WebSocket message receiving with automatic retry logic.
@@ -104,20 +84,38 @@ class WebsocketService(ABC):
try:
if await self._reconnect_websocket(retry_count):
retry_count = 0 # Reset counter on successful reconnection
wait_time = self._calculate_wait_time(retry_count)
wait_time = exponential_backoff_time(retry_count)
await asyncio.sleep(wait_time)
except Exception as reconnect_error:
logger.error(f"{self} reconnection failed: {reconnect_error}")
continue
@abstractmethod
async def _connect(self):
"""Implement service-specific connection logic. This function will
connect to the websocket via _connect_websocket() among other connection
logic."""
pass
@abstractmethod
async def _disconnect(self):
"""Implement service-specific disconnection logic. This function will
disconnect to the websocket via _connect_websocket() among other
connection logic.
"""
pass
@abstractmethod
async def _connect_websocket(self):
"""Implement service-specific websocket connection logic."""
"""Implement service-specific websocket connection logic. This function
should only connect to the websocket."""
pass
@abstractmethod
async def _disconnect_websocket(self):
"""Implement service-specific websocket disconnection logic."""
"""Implement service-specific websocket disconnection logic. This
function should only disconnect from the websocket."""
pass
@abstractmethod

View File

@@ -174,11 +174,9 @@ class BaseInputTransport(FrameProcessor):
async def _vad_analyze(self, audio_frame: InputAudioRawFrame) -> VADState:
state = VADState.QUIET
if self.vad_analyzer:
logger.trace(f"{self}: analyzing VAD on {audio_frame}")
state = await self.get_event_loop().run_in_executor(
self._executor, self.vad_analyzer.analyze_audio, audio_frame.audio
)
logger.trace(f"{self}: done analyzing VAD on {audio_frame}")
return state
async def _handle_vad(self, audio_frame: InputAudioRawFrame, vad_state: VADState):

View File

@@ -4,7 +4,6 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import inspect
from abc import ABC, abstractmethod
from typing import Optional
@@ -30,7 +29,6 @@ class TransportParams(BaseModel):
camera_out_framerate: int = 30
camera_out_color_format: str = "RGB"
audio_out_enabled: bool = False
audio_out_is_live: bool = False
audio_out_sample_rate: Optional[int] = None
audio_out_channels: int = 1
audio_out_bitrate: int = 96000

View File

@@ -55,45 +55,89 @@ class FastAPIWebsocketCallbacks(BaseModel):
on_session_timeout: Callable[[WebSocket], Awaitable[None]]
class FastAPIWebsocketClient:
def __init__(self, websocket: WebSocket, is_binary: bool, callbacks: FastAPIWebsocketCallbacks):
self._websocket = websocket
self._closing = False
self._is_binary = is_binary
self._callbacks = callbacks
def receive(self) -> typing.AsyncIterator[bytes | str]:
return self._websocket.iter_bytes() if self._is_binary else self._websocket.iter_text()
async def send(self, data: str | bytes):
if self._can_send():
if self._is_binary:
await self._websocket.send_bytes(data)
else:
await self._websocket.send_text(data)
async def disconnect(self):
if self.is_connected and not self.is_closing:
self._closing = True
await self._websocket.close()
await self.trigger_client_disconnected()
async def trigger_client_disconnected(self):
await self._callbacks.on_client_disconnected(self._websocket)
async def trigger_client_connected(self):
await self._callbacks.on_client_connected(self._websocket)
async def trigger_client_timout(self):
await self._callbacks.on_session_timeout(self._websocket)
def _can_send(self):
return self.is_connected and not self.is_closing
@property
def is_connected(self) -> bool:
return self._websocket.client_state == WebSocketState.CONNECTED
@property
def is_closing(self) -> bool:
return self._closing
class FastAPIWebsocketInputTransport(BaseInputTransport):
def __init__(
self,
websocket: WebSocket,
client: FastAPIWebsocketClient,
params: FastAPIWebsocketParams,
callbacks: FastAPIWebsocketCallbacks,
**kwargs,
):
super().__init__(params, **kwargs)
self._websocket = websocket
self._client = client
self._params = params
self._callbacks = callbacks
self._receive_task = None
self._monitor_websocket_task = None
async def start(self, frame: StartFrame):
await super().start(frame)
await self._params.serializer.setup(frame)
if self._params.session_timeout:
self._monitor_websocket_task = self.create_task(self._monitor_websocket())
await self._callbacks.on_client_connected(self._websocket)
await self._client.trigger_client_connected()
self._receive_task = self.create_task(self._receive_messages())
async def _stop_tasks(self):
if self._monitor_websocket_task:
await self.cancel_task(self._monitor_websocket_task)
await self.cancel_task(self._receive_task)
async def stop(self, frame: EndFrame):
await super().stop(frame)
await self.cancel_task(self._receive_task)
await self._stop_tasks()
await self._client.disconnect()
async def cancel(self, frame: CancelFrame):
await super().cancel(frame)
await self.cancel_task(self._receive_task)
def _iter_data(self) -> typing.AsyncIterator[bytes | str]:
if self._params.serializer.type == FrameSerializerType.BINARY:
return self._websocket.iter_bytes()
else:
return self._websocket.iter_text()
await self._stop_tasks()
await self._client.disconnect()
async def _receive_messages(self):
try:
async for message in self._iter_data():
async for message in self._client.receive():
frame = await self._params.serializer.deserialize(message)
if not frame:
@@ -106,19 +150,23 @@ class FastAPIWebsocketInputTransport(BaseInputTransport):
except Exception as e:
logger.error(f"{self} exception receiving data: {e.__class__.__name__} ({e})")
await self._callbacks.on_client_disconnected(self._websocket)
await self._client.trigger_client_disconnected()
async def _monitor_websocket(self):
"""Wait for self._params.session_timeout seconds, if the websocket is still open, trigger timeout event."""
await asyncio.sleep(self._params.session_timeout)
await self._callbacks.on_session_timeout(self._websocket)
await self._client.trigger_client_timout()
class FastAPIWebsocketOutputTransport(BaseOutputTransport):
def __init__(self, websocket: WebSocket, params: FastAPIWebsocketParams, **kwargs):
def __init__(
self,
client: FastAPIWebsocketClient,
params: FastAPIWebsocketParams,
**kwargs,
):
super().__init__(params, **kwargs)
self._websocket = websocket
self._client = client
self._params = params
# write_raw_audio_frames() is called quickly, as soon as we get audio
@@ -134,6 +182,14 @@ class FastAPIWebsocketOutputTransport(BaseOutputTransport):
await self._params.serializer.setup(frame)
self._send_interval = (self._audio_chunk_size / self.sample_rate) / 2
async def stop(self, frame: EndFrame):
await super().stop(frame)
await self._client.disconnect()
async def cancel(self, frame: CancelFrame):
await super().cancel(frame)
await self._client.disconnect()
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
@@ -145,7 +201,10 @@ class FastAPIWebsocketOutputTransport(BaseOutputTransport):
await self._write_frame(frame)
async def write_raw_audio_frames(self, frames: bytes):
if self._websocket.client_state != WebSocketState.CONNECTED:
if self._client.is_closing:
return
if not self._client.is_connected:
# Simulate audio playback with a sleep.
await self._write_audio_sleep()
return
@@ -172,25 +231,17 @@ class FastAPIWebsocketOutputTransport(BaseOutputTransport):
await self._write_frame(frame)
self._websocket_audio_buffer = bytes()
# Simulate audio playback with a sleep.
await self._write_audio_sleep()
async def _write_frame(self, frame: Frame):
try:
payload = await self._params.serializer.serialize(frame)
if payload and self._websocket.client_state == WebSocketState.CONNECTED:
await self._send_data(payload)
if payload:
await self._client.send(payload)
except Exception as e:
logger.error(f"{self} exception sending data: {e.__class__.__name__} ({e})")
def _send_data(self, data: str | bytes):
if self._params.serializer.type == FrameSerializerType.BINARY:
return self._websocket.send_bytes(data)
else:
return self._websocket.send_text(data)
async def _write_audio_sleep(self):
# Simulate a clock.
current_time = time.monotonic()
@@ -219,11 +270,14 @@ class FastAPIWebsocketTransport(BaseTransport):
on_session_timeout=self._on_session_timeout,
)
is_binary = self._params.serializer.type == FrameSerializerType.BINARY
self._client = FastAPIWebsocketClient(websocket, is_binary, self._callbacks)
self._input = FastAPIWebsocketInputTransport(
websocket, self._params, self._callbacks, name=self._input_name
self._client, self._params, name=self._input_name
)
self._output = FastAPIWebsocketOutputTransport(
websocket, self._params, name=self._output_name
self._client, self._params, name=self._output_name
)
# Register supported handlers. The user will only be able to register

View File

@@ -329,6 +329,10 @@ class DailyTransportClient(EventHandler):
def _speaker_name(self):
return f"speaker-{self}"
@property
def room_url(self) -> str:
return self._room_url
@property
def participant_id(self) -> str:
return self._participant_id
@@ -1112,6 +1116,10 @@ class DailyTransport(BaseTransport):
# DailyTransport
#
@property
def room_url(self) -> str:
return self._client.room_url
@property
def participant_id(self) -> str:
return self._client.participant_id

View File

@@ -0,0 +1,27 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
def exponential_backoff_time(
attempt: int, min_wait: float = 4, max_wait: float = 10, multiplier: float = 1
) -> float:
"""Calculate exponential backoff wait time.
Args:
attempt: Current attempt number (1-based)
min_wait: Minimum wait time in seconds
max_wait: Maximum wait time in seconds
multiplier: Base multiplier for exponential calculation
Returns:
Wait time in seconds
"""
try:
exp = 2 ** (attempt - 1) * multiplier
result = max(0, min(exp, max_wait))
return max(min_wait, result)
except (ValueError, ArithmeticError):
return max_wait

View File

@@ -13,8 +13,8 @@ ENDOFSENTENCE_PATTERN_STR = r"""
(?<!Mr|Ms|Dr) # Negative lookbehind: not preceded by Mr, Ms, Dr (combined bc. length is the same)
(?<!Mrs) # Negative lookbehind: not preceded by "Mrs"
(?<!Prof) # Negative lookbehind: not preceded by "Prof"
[\.\?\!;]| # Match a period, question mark, exclamation point, or semicolon
[。?!;।] # the full-width version (mainly used in East Asian languages such as Chinese, Hindi)
(\.\s*\.\s*\.|[\.\?\!;])| # Match a period, question mark, exclamation point, or semicolon
(\\s*\\s*\。|[。?!;।]) # the full-width version (mainly used in East Asian languages such as Chinese, Hindi)
$ # End of string
"""
ENDOFSENTENCE_PATTERN = re.compile(ENDOFSENTENCE_PATTERN_STR, re.VERBOSE)

View File

@@ -2,6 +2,7 @@ aiohttp~=3.10.3
anthropic~=0.30.0
azure-cognitiveservices-speech~=1.40.0
boto3~=1.35.27
cartesia~=1.3.1
daily-python~=0.11.0
deepgram-sdk~=3.5.0
fal-client~=0.4.1
@@ -15,6 +16,7 @@ langchain~=0.2.14
livekit~=0.13.1
lmnt~=1.1.4
loguru~=0.7.2
Markdown~=3.7
numpy~=1.26.4
openai~=1.37.2
openpipe~=4.24.0
@@ -28,5 +30,4 @@ silero-vad~=5.1
soxr~=0.5.0
together~=1.2.7
transformers~=4.48.0
websockets~=13.1
Markdown~=3.7
websockets~=13.1

View File

@@ -11,12 +11,13 @@ from pipecat.frames.frames import (
BotStoppedSpeakingFrame,
FunctionCallInProgressFrame,
FunctionCallResultFrame,
InputAudioRawFrame,
STTMuteFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.processors.filters.stt_mute_filter import STTMuteConfig, STTMuteFilter, STTMuteStrategy
from pipecat.tests.utils import run_test
from pipecat.tests.utils import SleepFrame, run_test
class TestSTTMuteFilter(unittest.IsolatedAsyncioTestCase):
@@ -26,10 +27,14 @@ class TestSTTMuteFilter(unittest.IsolatedAsyncioTestCase):
frames_to_send = [
BotStartedSpeakingFrame(), # First bot speech starts
UserStartedSpeakingFrame(), # Should be suppressed
InputAudioRawFrame(
audio=b"", sample_rate=16000, num_channels=1
), # Should be suppressed
UserStoppedSpeakingFrame(), # Should be suppressed
BotStoppedSpeakingFrame(), # First bot speech ends
BotStartedSpeakingFrame(), # Second bot speech
UserStartedSpeakingFrame(), # Should pass through
InputAudioRawFrame(audio=b"", sample_rate=16000, num_channels=1), # Should pass through
UserStoppedSpeakingFrame(), # Should pass through
BotStoppedSpeakingFrame(),
]
@@ -41,6 +46,7 @@ class TestSTTMuteFilter(unittest.IsolatedAsyncioTestCase):
STTMuteFrame, # mute=False
BotStartedSpeakingFrame,
UserStartedSpeakingFrame, # Now passes through
InputAudioRawFrame, # Now passes through
UserStoppedSpeakingFrame, # Now passes through
BotStoppedSpeakingFrame,
]
@@ -57,12 +63,19 @@ class TestSTTMuteFilter(unittest.IsolatedAsyncioTestCase):
frames_to_send = [
BotStartedSpeakingFrame(), # First speech starts
UserStartedSpeakingFrame(), # Should be suppressed
InputAudioRawFrame(
audio=b"", sample_rate=16000, num_channels=1
), # Should be suppressed
UserStoppedSpeakingFrame(), # Should be suppressed
BotStoppedSpeakingFrame(), # First speech ends
UserStartedSpeakingFrame(), # Should pass through
InputAudioRawFrame(audio=b"", sample_rate=16000, num_channels=1), # Should pass through
UserStoppedSpeakingFrame(), # Should pass through
BotStartedSpeakingFrame(), # Second speech starts
UserStartedSpeakingFrame(), # Should be suppressed again
InputAudioRawFrame(
audio=b"", sample_rate=16000, num_channels=1
), # Should be suppressed again
UserStoppedSpeakingFrame(), # Should be suppressed again
BotStoppedSpeakingFrame(), # Second speech ends
]
@@ -73,6 +86,7 @@ class TestSTTMuteFilter(unittest.IsolatedAsyncioTestCase):
BotStoppedSpeakingFrame,
STTMuteFrame, # mute=False
UserStartedSpeakingFrame,
InputAudioRawFrame,
UserStoppedSpeakingFrame,
BotStartedSpeakingFrame,
STTMuteFrame, # mute=True
@@ -134,15 +148,23 @@ class TestSTTMuteFilter(unittest.IsolatedAsyncioTestCase):
frames_to_send = [
UserStartedSpeakingFrame(), # Should be suppressed (starts muted)
InputAudioRawFrame(
audio=b"", sample_rate=16000, num_channels=1
), # Should be suppressed
UserStoppedSpeakingFrame(), # Should be suppressed
BotStartedSpeakingFrame(), # First bot speech
UserStartedSpeakingFrame(), # Should be suppressed
InputAudioRawFrame(
audio=b"", sample_rate=16000, num_channels=1
), # Should be suppressed
UserStoppedSpeakingFrame(), # Should be suppressed
BotStoppedSpeakingFrame(), # First speech ends, unmutes
UserStartedSpeakingFrame(), # Should pass through
InputAudioRawFrame(audio=b"", sample_rate=16000, num_channels=1), # Should pass through
UserStoppedSpeakingFrame(), # Should pass through
BotStartedSpeakingFrame(), # Second speech
UserStartedSpeakingFrame(), # Should pass through
InputAudioRawFrame(audio=b"", sample_rate=16000, num_channels=1), # Should pass through
UserStoppedSpeakingFrame(), # Should pass through
BotStoppedSpeakingFrame(),
]
@@ -153,9 +175,11 @@ class TestSTTMuteFilter(unittest.IsolatedAsyncioTestCase):
BotStoppedSpeakingFrame,
STTMuteFrame, # mute=False after first speech
UserStartedSpeakingFrame,
InputAudioRawFrame,
UserStoppedSpeakingFrame,
BotStartedSpeakingFrame,
UserStartedSpeakingFrame,
InputAudioRawFrame,
UserStoppedSpeakingFrame,
BotStoppedSpeakingFrame,
]
@@ -190,23 +214,30 @@ class TestSTTMuteFilter(unittest.IsolatedAsyncioTestCase):
frames_to_send = [
UserStartedSpeakingFrame(), # Should pass through
InputAudioRawFrame(audio=b"", sample_rate=16000, num_channels=1), # Should pass through
UserStoppedSpeakingFrame(), # Should pass through
BotStartedSpeakingFrame(), # Bot starts speaking
UserStartedSpeakingFrame(), # Should be suppressed
InputAudioRawFrame(
audio=b"", sample_rate=16000, num_channels=1
), # Should be suppressed
UserStoppedSpeakingFrame(), # Should be suppressed
BotStoppedSpeakingFrame(), # Bot stops speaking
UserStartedSpeakingFrame(), # Should pass through
InputAudioRawFrame(audio=b"", sample_rate=16000, num_channels=1), # Should pass through
UserStoppedSpeakingFrame(), # Should pass through
]
expected_returned_frames = [
UserStartedSpeakingFrame,
InputAudioRawFrame,
UserStoppedSpeakingFrame,
BotStartedSpeakingFrame,
STTMuteFrame, # mute=True
BotStoppedSpeakingFrame,
STTMuteFrame, # mute=False
UserStartedSpeakingFrame,
InputAudioRawFrame,
UserStoppedSpeakingFrame,
]

View File

@@ -0,0 +1,34 @@
#
# Copyright (c) 2024-2025 Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import unittest
from pipecat.utils.network import exponential_backoff_time
class TestUtilsNetwork(unittest.IsolatedAsyncioTestCase):
async def test_exponential_backoff_time(self):
# min_wait=4, max_wait=10, multiplier=1
assert exponential_backoff_time(attempt=1, min_wait=4, max_wait=10, multiplier=1) == 4
assert exponential_backoff_time(attempt=2, min_wait=4, max_wait=10, multiplier=1) == 4
assert exponential_backoff_time(attempt=3, min_wait=4, max_wait=10, multiplier=1) == 4
assert exponential_backoff_time(attempt=4, min_wait=4, max_wait=10, multiplier=1) == 8
assert exponential_backoff_time(attempt=5, min_wait=4, max_wait=10, multiplier=1) == 10
assert exponential_backoff_time(attempt=6, min_wait=4, max_wait=10, multiplier=1) == 10
# min_wait=1, max_wait=10, multiplier=1
assert exponential_backoff_time(attempt=1, min_wait=1, max_wait=10, multiplier=1) == 1
assert exponential_backoff_time(attempt=2, min_wait=1, max_wait=10, multiplier=1) == 2
assert exponential_backoff_time(attempt=3, min_wait=1, max_wait=10, multiplier=1) == 4
assert exponential_backoff_time(attempt=4, min_wait=1, max_wait=10, multiplier=1) == 8
assert exponential_backoff_time(attempt=5, min_wait=1, max_wait=10, multiplier=1) == 10
assert exponential_backoff_time(attempt=6, min_wait=1, max_wait=10, multiplier=1) == 10
# min_wait=1, max_wait=20, multiplier=2
assert exponential_backoff_time(attempt=1, min_wait=1, max_wait=20, multiplier=2) == 2
assert exponential_backoff_time(attempt=2, min_wait=1, max_wait=20, multiplier=2) == 4
assert exponential_backoff_time(attempt=3, min_wait=1, max_wait=20, multiplier=2) == 8
assert exponential_backoff_time(attempt=4, min_wait=1, max_wait=20, multiplier=2) == 16
assert exponential_backoff_time(attempt=5, min_wait=1, max_wait=20, multiplier=2) == 20
assert exponential_backoff_time(attempt=6, min_wait=1, max_wait=20, multiplier=2) == 20

View File

@@ -11,10 +11,13 @@ from pipecat.utils.string import match_endofsentence
class TestUtilsString(unittest.IsolatedAsyncioTestCase):
async def test_endofsentence(self):
assert match_endofsentence("This is a sentence.")
assert match_endofsentence("This is a sentence! ")
assert match_endofsentence("This is a sentence?")
assert match_endofsentence("This is a sentence;")
assert match_endofsentence("This is a sentence.") == 19
assert match_endofsentence("This is a sentence!") == 19
assert match_endofsentence("This is a sentence?") == 19
assert match_endofsentence("This is a sentence;") == 19
assert match_endofsentence("This is a sentence...") == 21
assert match_endofsentence("This is a sentence . . .") == 24
assert match_endofsentence("This is a sentence. ..") == 22
assert not match_endofsentence("This is not a sentence")
assert not match_endofsentence("This is not a sentence,")
assert not match_endofsentence("This is not a sentence, ")