Compare commits

...

27 Commits

Author SHA1 Message Date
James Hush
50e9ef11e5 Update example 2025-02-20 13:04:34 +08:00
James Hush
2cd8ff3848 Recording issue 2025-02-20 12:57:06 +08:00
Filipi da Silva Fuchter
77e777b1ce Merge pull request #1249 from pipecat-ai/invoking_call_start_function
Fixed an issue that `start_callback` was not invoked for some LLM services
2025-02-19 18:09:00 -03:00
Filipi Fuchter
7e7926059c Fixed an issue that start_callback was not invoked for some LLM services. 2025-02-19 18:04:20 -03:00
Aleix Conchillo Flaqué
c948754eff Merge pull request #1248 from pipecat-ai/aleix/daily-transport-room-url
daily: add room_url property
2025-02-19 09:46:46 -08:00
Aleix Conchillo Flaqué
83f1a8830d daily: add room_url property 2025-02-19 09:29:53 -08:00
James Hush
80f8e05fcf docs: fix transcripts in translation chatbot example (#1199) 2025-02-19 16:07:22 +08:00
Aleix Conchillo Flaqué
afd1a1e80b Merge pull request #1245 from pipecat-ai/aleix/stt-mute-filter-trace-logging 2025-02-18 21:21:55 -08:00
Aleix Conchillo Flaqué
84ac88cad7 STTMuteFilter: change suppressed logging to trace 2025-02-18 18:03:37 -08:00
Aleix Conchillo Flaqué
211163e5c7 Merge pull request #1241 from pipecat-ai/aleix/deepgram-nova-3
deepgram: use the new nova-3 model as default
2025-02-18 17:53:04 -08:00
Aleix Conchillo Flaqué
1b0bcebef6 deepgram: use the new nova-3 model as default 2025-02-18 17:51:54 -08:00
Aleix Conchillo Flaqué
89736b03c4 Merge pull request #1243 from pipecat-ai/aleix/add-deepgram-addons
deepgram: add ability to provide custom addons
2025-02-18 17:47:48 -08:00
Aleix Conchillo Flaqué
4edda718ed deepgram: add ability to provide custom addons 2025-02-18 17:45:41 -08:00
Aleix Conchillo Flaqué
22a62edc9e Merge pull request #1242 from pipecat-ai/aleix/utils-network-exponential
network: added exponential_backoff_time() function
2025-02-18 17:44:21 -08:00
Aleix Conchillo Flaqué
50b6cc8135 network: added exponential_backoff_time() function 2025-02-18 17:42:43 -08:00
Aleix Conchillo Flaqué
45cf36925a Merge pull request #1240 from pipecat-ai/aleix/handle-deepgram-on-error
deepgram: handle error event and reconnect
2025-02-18 17:41:29 -08:00
Filipi da Silva Fuchter
83a71e1fec Merge pull request #1112 from pipecat-ai/bot-ready-signalling-rn
React Native client for the bot ready example.
2025-02-18 15:17:38 -03:00
Filipi Fuchter
e809c8680e Upgrading to use the latest node stable version 2025-02-18 15:12:44 -03:00
Aleix Conchillo Flaqué
c926063d74 deepgram: handle error event and reconnect 2025-02-18 09:52:18 -08:00
Aleix Conchillo Flaqué
0334550356 Merge pull request #1238 from pipecat-ai/aleix/stt-mute-filter-ignore-input-audio-frames
STTMuteFilter: ignore audio frames so no transcriptions are generated
2025-02-18 09:48:13 -08:00
Aleix Conchillo Flaqué
90b9dce710 STTMuteFilter: ignore audio frames so no transcriptions are generated 2025-02-17 19:59:05 -08:00
Filipi Fuchter
7e3e126730 Migrating the base API URL for the react native example to an .env file. 2025-01-30 10:42:16 -03:00
Filipi Fuchter
75ca0571bb Improving the layout from the bot ready react native demo. 2025-01-30 10:31:04 -03:00
Filipi Fuchter
a48e5d0714 Only sending the message when it is a remote audio track. 2025-01-30 10:14:37 -03:00
Filipi Fuchter
2b6a992207 Sending the app-message to start playing audio once the track has started. 2025-01-30 09:37:33 -03:00
Filipi Fuchter
24cf106ed2 Refactoring the code to ask for the room that it should connect. 2025-01-30 09:14:18 -03:00
Filipi Fuchter
95c8346cb5 Starting to create a react native client for the bot ready example. 2025-01-29 19:00:42 -03:00
29 changed files with 11531 additions and 43 deletions

15
.gitignore vendored
View File

@@ -32,6 +32,21 @@ fly.toml
# Example files
pipecat/examples/twilio-chatbot/templates/streams.xml
pipecat/examples/bot-ready-signalling/client/react-native/node_modules/
pipecat/examples/bot-ready-signalling/client/react-native/.expo/
pipecat/examples/bot-ready-signalling/client/react-native/dist/
pipecat/examples/bot-ready-signalling/client/react-native/npm-debug.*
pipecat/examples/bot-ready-signalling/client/react-native/*.jks
pipecat/examples/bot-ready-signalling/client/react-native/*.p8
pipecat/examples/bot-ready-signalling/client/react-native/*.p12
pipecat/examples/bot-ready-signalling/client/react-native/*.key
pipecat/examples/bot-ready-signalling/client/react-native/*.mobileprovision
pipecat/examples/bot-ready-signalling/client/react-native/*.orig.*
pipecat/examples/bot-ready-signalling/client/react-native/web-build/
# macOS
.DS_Store
# Documentation
docs/api/_build/

View File

@@ -5,6 +5,37 @@ All notable changes to **Pipecat** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## Unreleased
### Added
- Added `room_url` property to `DailyTransport`.
- Added `addons` argument to `DeepgramSTTService`.
- Added `exponential_backoff_time()` to `utils.network` module.
### Changed
- `DeepgramSTTService` now uses the new `nova-3` model by default. If you want
to use the previous model you can pass `LiveOptions(model="nova-2-general")`.
(see https://deepgram.com/learn/introducing-nova-3-speech-to-text-api)
```python
stt = DeepgramSTTService(..., live_options=LiveOptions(model="nova-2-general"))
```
### Fixed
- Fixed an issue that `start_callback` was not invoked for some LLM services.
- Fixed an issue that would cause `DeepgramSTTService` to stop working after an
error occurred (e.g. sudden network loss). If the network recovered we would
not reconnect.
- Fixed a `STTMuteFilter` issue that would not mute user audio frames causing
transcriptions to be generated by the STT service.
## [0.0.57] - 2025-02-14
### Added

View File

@@ -0,0 +1 @@
22.14

View File

@@ -0,0 +1,60 @@
# React Native Implementation
Basic implementation using the [Pipecat React Native SDK](https://docs.pipecat.ai/client/react-native/introduction).
## Usage
### Expo requirements
This project cannot be used with an [Expo Go](https://docs.expo.dev/workflow/expo-go/) app because [it requires custom native code](https://docs.expo.io/workflow/customizing/).
When a project requires custom native code or a config plugin, we need to transition from using [Expo Go](https://docs.expo.dev/workflow/expo-go/)
to a [development build](https://docs.expo.dev/development/introduction/).
More details about the custom native code used by this demo can be found in [rn-daily-js-expo-config-plugin](https://github.com/daily-co/rn-daily-js-expo-config-plugin).
### Building remotely
If you do not have experience with Xcode and Android Studio builds or do not have them installed locally on your computer, you will need to follow [this guide from Expo to use EAS Build](https://docs.expo.dev/development/create-development-builds/#create-and-install-eas-build).
### Building locally
You will need to have installed locally on your computer:
- [Xcode](https://developer.apple.com/xcode/) to build for iOS;
- [Android Studio](https://developer.android.com/studio) to build for Android;
#### Install the demo dependencies
```bash
# Use the version of node specified in .nvmrc
nvm i
# Install dependencies
npm i
# Before a native app can be compiled, the native source code must be generated.
npx expo prebuild
# Configure the environment variable to connect to the local server
cp env.example .env
# edit .env and add your local ip address, for example: http://192.168.1.16:7860
```
#### Running on Android
After plugging in an Android device [configured for debugging](https://developer.android.com/studio/debug/dev-options), run the following command:
```
npm run android
```
#### Running on iOS
Run the following command:
```
npm run ios
```
#### Connect to the server
Use the http://localhost:5173 in your app.

View File

@@ -0,0 +1,75 @@
{
"expo": {
"name": "bot-ready-rn",
"slug": "bot-ready-rn",
"version": "1.0.0",
"orientation": "portrait",
"icon": "./assets/icon.png",
"userInterfaceStyle": "light",
"splash": {
"image": "./assets/splash.png",
"resizeMode": "contain",
"backgroundColor": "#ffffff"
},
"updates": {
"fallbackToCacheTimeout": 0
},
"assetBundlePatterns": [
"**/*"
],
"ios": {
"supportsTablet": true,
"bitcode": false,
"bundleIdentifier": "co.daily.expo.BotReady",
"infoPlist": {
"UIBackgroundModes": [
"voip"
]
},
"appleTeamId": "EEBGKV9N3N"
},
"android": {
"adaptiveIcon": {
"foregroundImage": "./assets/adaptive-icon.png",
"backgroundColor": "#FFFFFF"
},
"package": "co.daily.expo.BotReady",
"permissions": [
"android.permission.ACCESS_NETWORK_STATE",
"android.permission.BLUETOOTH",
"android.permission.CAMERA",
"android.permission.INTERNET",
"android.permission.MODIFY_AUDIO_SETTINGS",
"android.permission.RECORD_AUDIO",
"android.permission.SYSTEM_ALERT_WINDOW",
"android.permission.WAKE_LOCK",
"android.permission.FOREGROUND_SERVICE",
"android.permission.FOREGROUND_SERVICE_CAMERA",
"android.permission.FOREGROUND_SERVICE_MICROPHONE",
"android.permission.FOREGROUND_SERVICE_MEDIA_PROJECTION",
"android.permission.POST_NOTIFICATIONS"
]
},
"web": {
"favicon": "./assets/favicon.png"
},
"plugins": [
"@config-plugins/react-native-webrtc",
"@daily-co/config-plugin-rn-daily-js",
[
"expo-build-properties",
{
"android": {
"minSdkVersion": 24,
"compileSdkVersion": 35,
"targetSdkVersion": 34,
"buildToolsVersion": "35.0.0"
},
"ios": {
"deploymentTarget": "15.1"
}
}
]
]
}
}

Binary file not shown.

After

Width:  |  Height:  |  Size: 17 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.4 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 22 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 46 KiB

View File

@@ -0,0 +1,7 @@
module.exports = function(api) {
api.cache(true);
return {
presets: ['babel-preset-expo'],
plugins: [["module:react-native-dotenv"]],
};
};

View File

@@ -0,0 +1 @@
API_BASE_URL=http://YOUR_LOCAL_IP:7860

View File

@@ -0,0 +1,7 @@
import { registerRootComponent } from "expo";
import App from "./src/App";
// registerRootComponent calls AppRegistry.registerComponent('main', () => App);
// It also ensures that the environment is set up appropriately
registerRootComponent(App);

View File

@@ -0,0 +1,4 @@
// Learn more https://docs.expo.io/guides/customizing-metro
const { getDefaultConfig } = require('expo/metro-config');
module.exports = getDefaultConfig(__dirname);

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,31 @@
{
"name": "bot-ready-rn",
"version": "1.0.0",
"scripts": {
"start": "expo start --dev-client",
"android": "expo run:android --device",
"ios": "expo run:ios --device",
"web": "expo start --web"
},
"dependencies": {
"@config-plugins/react-native-webrtc": "^10.0.0",
"@daily-co/config-plugin-rn-daily-js": "0.0.7",
"@daily-co/react-native-daily-js": "^0.70.0",
"@daily-co/react-native-webrtc": "^118.0.3-daily.2",
"@react-native-async-storage/async-storage": "1.23.1",
"expo": "^52.0.0",
"expo-build-properties": "~0.13.1",
"expo-dev-client": "~5.0.5",
"expo-splash-screen": "~0.29.16",
"expo-status-bar": "~2.0.0",
"react": "18.3.1",
"react-native": "0.76.3",
"react-native-background-timer": "^2.4.1",
"react-native-dotenv": "^3.4.11",
"react-native-get-random-values": "^1.11.0"
},
"devDependencies": {
"@babel/core": "^7.12.9"
},
"private": true
}

View File

@@ -0,0 +1,121 @@
import React, { useState, useEffect } from 'react';
import {SafeAreaView, View, Text, Button, StyleSheet, ScrollView} from 'react-native';
import Daily from "@daily-co/react-native-daily-js";
import { API_BASE_URL } from "@env";
const CallScreen = () => {
const [connectionStatus, setConnectionStatus] = useState('Disconnected');
const [isConnected, setIsConnected] = useState(false);
const [callObject, setCallObject] = useState(null);
const [logs, setLogs] = useState([]);
useEffect(() => {
if (callObject) {
setupTrackListeners(callObject);
}
}, [callObject]);
const log = (message) => {
setLogs((prevLogs) => [...prevLogs, `${new Date().toISOString()} - ${message}`]);
console.log(message);
};
const setupTrackListeners = (callObject) => {
callObject.on("joined-meeting", () => {
setConnectionStatus('Connected');
setIsConnected(true);
log('Client connected');
});
callObject.on("left-meeting", () => {
setConnectionStatus('Disconnected');
setIsConnected(false);
log('Client disconnected');
});
callObject.on("participant-left", () => {
// When the bot leaves, we are also disconnecting from the call
disconnect().catch((err) => {
log(`Failed to disconnect ${err}`);
})
});
// Trigger so the bot can start sending audio
callObject.on("track-started", (evt) => {
if (evt.track.kind === "audio" && evt.participant.local === false) {
handleEventToConsole(evt)
log("Sending the message that will trigger the bot to play the audio.")
callObject.sendAppMessage("playable")
}
});
callObject.on("error", (evt) => log(`Error: ${evt.error}`));
// Other events just for awareness
callObject.on("track-stopped", handleEventToConsole);
callObject.on("participant-joined", handleEventToConsole);
callObject.on("participant-updated", handleEventToConsole);
};
const handleEventToConsole = (evt) => {
log(`Received event: ${evt.action}`);
};
const connect = async () => {
try {
const callObject = Daily.createCallObject({ subscribeToTracksAutomatically: true });
setCallObject(callObject);
const connectionUrl = `${API_BASE_URL}/connect`
const res = await fetch(connectionUrl, { method: "POST", headers: { "Content-Type": "application/json" } });
const roomInfo = await res.json();
await callObject.join({ url: roomInfo.room_url });
} catch (error) {
log(`Error connecting: ${error.message}`);
}
};
const disconnect = async () => {
if (callObject) {
try {
await callObject.leave();
await callObject.destroy();
setCallObject(null);
} catch (error) {
log(`Error disconnecting: ${error.message}`);
}
}
};
return (
<SafeAreaView style={styles.safeArea}>
<View style={styles.container}>
<View style={styles.statusBar}>
<Text>Status: <Text style={styles.status}>{connectionStatus}</Text></Text>
<View style={styles.controls}>
<Button
title={isConnected ? "Disconnect" : "Connect"}
onPress={isConnected ? disconnect : connect}
/>
</View>
</View>
<View style={styles.debugPanel}>
<Text style={styles.debugTitle}>Debug Info</Text>
<ScrollView style={styles.debugLog}>
{logs.map((logEntry, index) => (
<Text key={index} style={styles.logText}>{logEntry}</Text>
))}
</ScrollView>
</View>
</View>
</SafeAreaView>
);
};
const styles = StyleSheet.create({
safeArea: { flex: 1, backgroundColor: '#f0f0f0', padding: 20 },
container: { flex: 1, margin: 20 },
statusBar: { flexDirection: 'row', justifyContent: 'space-between', alignItems: 'center', padding: 10, backgroundColor: '#fff', borderRadius: 8, marginBottom: 20 },
status: { fontWeight: 'bold' },
controls: { flexDirection: 'row', gap: 10 },
debugPanel: { height: '80%', backgroundColor: '#fff', borderRadius: 8, padding: 20},
debugTitle: { fontSize: 16, fontWeight: 'bold' },
debugLog: { height: '100%', overflow: 'scroll', backgroundColor: '#f8f8f8', padding: 10, borderRadius: 4, fontFamily: 'monospace', fontSize: 12, lineHeight: 1.4 },
});
export default CallScreen;

View File

@@ -21,6 +21,11 @@ from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.transports.services.helpers.daily_rest import (
DailyMeetingTokenParams,
DailyRESTHelper,
DailyRoomParams,
)
load_dotenv(override=True)
@@ -30,10 +35,31 @@ logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
daily_rest_helper = DailyRESTHelper(
daily_api_key=os.getenv("DAILY_API_KEY"),
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=session,
)
room = await daily_rest_helper.create_room(
params=DailyRoomParams(properties={"enable_recording": "cloud"})
)
params = DailyMeetingTokenParams(
properties={
"enable_recording": "cloud",
"start_cloud_recording": True,
}
)
token = await daily_rest_helper.get_token(
room_url=room.url, expiry_time=60 * 60, params=params
)
logger.debug(f"Room URL: {room.url} Room token: {token}")
transport = DailyTransport(
room_url,
room.url,
token,
"Respond bot",
DailyParams(
@@ -85,6 +111,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
# await transport.start_recording()
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([context_aggregator.user().get_context_frame()])

View File

@@ -14,6 +14,7 @@ from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -30,6 +31,12 @@ logger.add(sys.stderr, level="DEBUG")
video_participant_id = None
async def start_fetch_weather(function_name, llm, context):
"""Push a frame to the LLM; this is handy when the LLM response might take a while."""
await llm.push_frame(TTSSpeakFrame("Let me check on that."))
logger.debug(f"Starting fetch_weather_from_api with function_name: {function_name}")
async def get_weather(function_name, tool_call_id, arguments, llm, context, result_callback):
location = arguments["location"]
await result_callback(f"The weather in {location} is currently 72 degrees and sunny.")
@@ -63,7 +70,7 @@ async def main():
)
llm = GoogleLLMService(api_key=os.getenv("GOOGLE_API_KEY"), model="gemini-2.0-flash-001")
llm.register_function("get_weather", get_weather)
llm.register_function("get_weather", get_weather, start_fetch_weather)
llm.register_function("get_image", get_image)
tools = [

View File

@@ -24,17 +24,15 @@ from pipecat.frames.frames import (
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.frameworks.rtvi import RTVIObserver, RTVIProcessor
from pipecat.processors.transcript_processor import TranscriptProcessor
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import (
DailyParams,
DailyTransport,
)
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
@@ -166,21 +164,32 @@ async def main():
async def on_transcript_update(processor, frame):
await transcript_handler.on_transcript_update(processor, frame)
rtvi = RTVIProcessor()
pipeline = Pipeline(
[
transport.input(),
rtvi,
stt,
transcript.user(), # User transcripts
tp,
llm,
tts,
transport.output(),
transcript.assistant(),
context_aggregator.assistant(),
transcript.assistant(), # Assistant transcripts
]
)
task = PipelineTask(pipeline)
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=False, # We don't want to interrupt the translator bot
enable_metrics=True,
enable_usage_metrics=True,
observers=[RTVIObserver(rtvi)],
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):

View File

@@ -1,4 +1,4 @@
python-dotenv
fastapi[all]
pipecat-ai[daily,openai,azure]
pipecat-ai[cartesia,daily,deepgram,openai,silero]
aiohttp

View File

@@ -23,6 +23,7 @@ from pipecat.frames.frames import (
Frame,
FunctionCallInProgressFrame,
FunctionCallResultFrame,
InputAudioRawFrame,
StartFrame,
StartInterruptionFrame,
StopInterruptionFrame,
@@ -185,13 +186,14 @@ class STTMuteFilter(FrameProcessor):
StopInterruptionFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
InputAudioRawFrame,
),
):
# Only pass VAD-related frames when not muted
if not self.is_muted:
await self.push_frame(frame, direction)
else:
logger.debug(f"{frame.__class__.__name__} suppressed - STT currently muted")
logger.trace(f"{frame.__class__.__name__} suppressed - STT currently muted")
else:
# Pass all other frames through
await self.push_frame(frame, direction)

View File

@@ -175,6 +175,7 @@ class LLMService(AIService):
f = self._callbacks[None]
else:
return None
await self.call_start_function(context, function_name)
await context.call_function(
f,
function_name=function_name,

View File

@@ -5,7 +5,7 @@
#
import asyncio
from typing import AsyncGenerator, Optional
from typing import AsyncGenerator, Dict, Optional
from loguru import logger
@@ -34,6 +34,7 @@ try:
AsyncListenWebSocketClient,
DeepgramClient,
DeepgramClientOptions,
ErrorResponse,
LiveOptions,
LiveResultResponse,
LiveTranscriptionEvents,
@@ -120,6 +121,7 @@ class DeepgramSTTService(STTService):
url: str = "",
sample_rate: Optional[int] = None,
live_options: Optional[LiveOptions] = None,
addons: Optional[Dict] = None,
**kwargs,
):
super().__init__(sample_rate=sample_rate, **kwargs)
@@ -127,7 +129,7 @@ class DeepgramSTTService(STTService):
default_options = LiveOptions(
encoding="linear16",
language=Language.EN,
model="nova-2-general",
model="nova-3-general",
channels=1,
interim_results=True,
smart_format=True,
@@ -147,6 +149,7 @@ class DeepgramSTTService(STTService):
merged_options.language = merged_options.language.value
self._settings = merged_options.to_dict()
self._addons = addons
self._client = DeepgramClient(
api_key,
@@ -155,13 +158,10 @@ class DeepgramSTTService(STTService):
options={"keepalive": "true"}, # verbose=logging.DEBUG
),
)
self._connection: AsyncListenWebSocketClient = self._client.listen.asyncwebsocket.v("1")
self._connection.on(LiveTranscriptionEvents.Transcript, self._on_message)
if self.vad_enabled:
self._register_event_handler("on_speech_started")
self._register_event_handler("on_utterance_end")
self._connection.on(LiveTranscriptionEvents.SpeechStarted, self._on_speech_started)
self._connection.on(LiveTranscriptionEvents.UtteranceEnd, self._on_utterance_end)
@property
def vad_enabled(self):
@@ -202,7 +202,25 @@ class DeepgramSTTService(STTService):
async def _connect(self):
logger.debug("Connecting to Deepgram")
if not await self._connection.start(self._settings):
self._connection: AsyncListenWebSocketClient = self._client.listen.asyncwebsocket.v("1")
self._connection.on(
LiveTranscriptionEvents(LiveTranscriptionEvents.Transcript), self._on_message
)
self._connection.on(LiveTranscriptionEvents(LiveTranscriptionEvents.Error), self._on_error)
if self.vad_enabled:
self._connection.on(
LiveTranscriptionEvents(LiveTranscriptionEvents.SpeechStarted),
self._on_speech_started,
)
self._connection.on(
LiveTranscriptionEvents(LiveTranscriptionEvents.UtteranceEnd),
self._on_utterance_end,
)
if not await self._connection.start(options=self._settings, addons=self._addons):
logger.error(f"{self}: unable to connect to Deepgram")
async def _disconnect(self):
@@ -214,6 +232,15 @@ class DeepgramSTTService(STTService):
await self.start_ttfb_metrics()
await self.start_processing_metrics()
async def _on_error(self, *args, **kwargs):
error: ErrorResponse = kwargs["error"]
logger.warning(f"{self} connection error, will retry: {error}")
await self.stop_all_metrics()
# NOTE(aleix): we don't disconnect (i.e. call finish on the connection)
# because this triggers more errors internally in the Deepgram SDK. So,
# we just forget about the previous connection and create a new one.
await self._connect()
async def _on_speech_started(self, *args, **kwargs):
await self.start_metrics()
await self._call_event_handler("on_speech_started", *args, **kwargs)

View File

@@ -266,7 +266,6 @@ class BaseOpenAILLMService(LLMService):
if tool_call.function and tool_call.function.name:
function_name += tool_call.function.name
tool_call_id = tool_call.id
await self.call_start_function(context, function_name)
if tool_call.function and tool_call.function.arguments:
# Keep iterating through the response to collect all the argument fragments
arguments += tool_call.function.arguments

View File

@@ -13,6 +13,7 @@ from loguru import logger
from websockets.protocol import State
from pipecat.frames.frames import ErrorFrame
from pipecat.utils.network import exponential_backoff_time
class WebsocketService(ABC):
@@ -51,27 +52,6 @@ class WebsocketService(ABC):
await self._connect_websocket()
return await self._verify_connection()
def _calculate_wait_time(
self, attempt: int, min_wait: float = 4, max_wait: float = 10, multiplier: float = 1
) -> float:
"""Calculate exponential backoff wait time.
Args:
attempt: Current attempt number (1-based)
min_wait: Minimum wait time in seconds
max_wait: Maximum wait time in seconds
multiplier: Base multiplier for exponential calculation
Returns:
Wait time in seconds
"""
try:
exp = 2 ** (attempt - 1) * multiplier
result = max(0, min(exp, max_wait))
return max(min_wait, result)
except (ValueError, ArithmeticError):
return max_wait
async def _receive_task_handler(self, report_error: Callable[[ErrorFrame], Awaitable[None]]):
"""Handles WebSocket message receiving with automatic retry logic.
@@ -104,7 +84,7 @@ class WebsocketService(ABC):
try:
if await self._reconnect_websocket(retry_count):
retry_count = 0 # Reset counter on successful reconnection
wait_time = self._calculate_wait_time(retry_count)
wait_time = exponential_backoff_time(retry_count)
await asyncio.sleep(wait_time)
except Exception as reconnect_error:
logger.error(f"{self} reconnection failed: {reconnect_error}")

View File

@@ -329,6 +329,10 @@ class DailyTransportClient(EventHandler):
def _speaker_name(self):
return f"speaker-{self}"
@property
def room_url(self) -> str:
return self._room_url
@property
def participant_id(self) -> str:
return self._participant_id
@@ -1112,6 +1116,10 @@ class DailyTransport(BaseTransport):
# DailyTransport
#
@property
def room_url(self) -> str:
return self._client.room_url
@property
def participant_id(self) -> str:
return self._client.participant_id

View File

@@ -0,0 +1,27 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
def exponential_backoff_time(
attempt: int, min_wait: float = 4, max_wait: float = 10, multiplier: float = 1
) -> float:
"""Calculate exponential backoff wait time.
Args:
attempt: Current attempt number (1-based)
min_wait: Minimum wait time in seconds
max_wait: Maximum wait time in seconds
multiplier: Base multiplier for exponential calculation
Returns:
Wait time in seconds
"""
try:
exp = 2 ** (attempt - 1) * multiplier
result = max(0, min(exp, max_wait))
return max(min_wait, result)
except (ValueError, ArithmeticError):
return max_wait

View File

@@ -11,12 +11,13 @@ from pipecat.frames.frames import (
BotStoppedSpeakingFrame,
FunctionCallInProgressFrame,
FunctionCallResultFrame,
InputAudioRawFrame,
STTMuteFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.processors.filters.stt_mute_filter import STTMuteConfig, STTMuteFilter, STTMuteStrategy
from pipecat.tests.utils import run_test
from pipecat.tests.utils import SleepFrame, run_test
class TestSTTMuteFilter(unittest.IsolatedAsyncioTestCase):
@@ -26,10 +27,14 @@ class TestSTTMuteFilter(unittest.IsolatedAsyncioTestCase):
frames_to_send = [
BotStartedSpeakingFrame(), # First bot speech starts
UserStartedSpeakingFrame(), # Should be suppressed
InputAudioRawFrame(
audio=b"", sample_rate=16000, num_channels=1
), # Should be suppressed
UserStoppedSpeakingFrame(), # Should be suppressed
BotStoppedSpeakingFrame(), # First bot speech ends
BotStartedSpeakingFrame(), # Second bot speech
UserStartedSpeakingFrame(), # Should pass through
InputAudioRawFrame(audio=b"", sample_rate=16000, num_channels=1), # Should pass through
UserStoppedSpeakingFrame(), # Should pass through
BotStoppedSpeakingFrame(),
]
@@ -41,6 +46,7 @@ class TestSTTMuteFilter(unittest.IsolatedAsyncioTestCase):
STTMuteFrame, # mute=False
BotStartedSpeakingFrame,
UserStartedSpeakingFrame, # Now passes through
InputAudioRawFrame, # Now passes through
UserStoppedSpeakingFrame, # Now passes through
BotStoppedSpeakingFrame,
]
@@ -57,12 +63,19 @@ class TestSTTMuteFilter(unittest.IsolatedAsyncioTestCase):
frames_to_send = [
BotStartedSpeakingFrame(), # First speech starts
UserStartedSpeakingFrame(), # Should be suppressed
InputAudioRawFrame(
audio=b"", sample_rate=16000, num_channels=1
), # Should be suppressed
UserStoppedSpeakingFrame(), # Should be suppressed
BotStoppedSpeakingFrame(), # First speech ends
UserStartedSpeakingFrame(), # Should pass through
InputAudioRawFrame(audio=b"", sample_rate=16000, num_channels=1), # Should pass through
UserStoppedSpeakingFrame(), # Should pass through
BotStartedSpeakingFrame(), # Second speech starts
UserStartedSpeakingFrame(), # Should be suppressed again
InputAudioRawFrame(
audio=b"", sample_rate=16000, num_channels=1
), # Should be suppressed again
UserStoppedSpeakingFrame(), # Should be suppressed again
BotStoppedSpeakingFrame(), # Second speech ends
]
@@ -73,6 +86,7 @@ class TestSTTMuteFilter(unittest.IsolatedAsyncioTestCase):
BotStoppedSpeakingFrame,
STTMuteFrame, # mute=False
UserStartedSpeakingFrame,
InputAudioRawFrame,
UserStoppedSpeakingFrame,
BotStartedSpeakingFrame,
STTMuteFrame, # mute=True
@@ -134,15 +148,23 @@ class TestSTTMuteFilter(unittest.IsolatedAsyncioTestCase):
frames_to_send = [
UserStartedSpeakingFrame(), # Should be suppressed (starts muted)
InputAudioRawFrame(
audio=b"", sample_rate=16000, num_channels=1
), # Should be suppressed
UserStoppedSpeakingFrame(), # Should be suppressed
BotStartedSpeakingFrame(), # First bot speech
UserStartedSpeakingFrame(), # Should be suppressed
InputAudioRawFrame(
audio=b"", sample_rate=16000, num_channels=1
), # Should be suppressed
UserStoppedSpeakingFrame(), # Should be suppressed
BotStoppedSpeakingFrame(), # First speech ends, unmutes
UserStartedSpeakingFrame(), # Should pass through
InputAudioRawFrame(audio=b"", sample_rate=16000, num_channels=1), # Should pass through
UserStoppedSpeakingFrame(), # Should pass through
BotStartedSpeakingFrame(), # Second speech
UserStartedSpeakingFrame(), # Should pass through
InputAudioRawFrame(audio=b"", sample_rate=16000, num_channels=1), # Should pass through
UserStoppedSpeakingFrame(), # Should pass through
BotStoppedSpeakingFrame(),
]
@@ -153,9 +175,11 @@ class TestSTTMuteFilter(unittest.IsolatedAsyncioTestCase):
BotStoppedSpeakingFrame,
STTMuteFrame, # mute=False after first speech
UserStartedSpeakingFrame,
InputAudioRawFrame,
UserStoppedSpeakingFrame,
BotStartedSpeakingFrame,
UserStartedSpeakingFrame,
InputAudioRawFrame,
UserStoppedSpeakingFrame,
BotStoppedSpeakingFrame,
]
@@ -190,23 +214,30 @@ class TestSTTMuteFilter(unittest.IsolatedAsyncioTestCase):
frames_to_send = [
UserStartedSpeakingFrame(), # Should pass through
InputAudioRawFrame(audio=b"", sample_rate=16000, num_channels=1), # Should pass through
UserStoppedSpeakingFrame(), # Should pass through
BotStartedSpeakingFrame(), # Bot starts speaking
UserStartedSpeakingFrame(), # Should be suppressed
InputAudioRawFrame(
audio=b"", sample_rate=16000, num_channels=1
), # Should be suppressed
UserStoppedSpeakingFrame(), # Should be suppressed
BotStoppedSpeakingFrame(), # Bot stops speaking
UserStartedSpeakingFrame(), # Should pass through
InputAudioRawFrame(audio=b"", sample_rate=16000, num_channels=1), # Should pass through
UserStoppedSpeakingFrame(), # Should pass through
]
expected_returned_frames = [
UserStartedSpeakingFrame,
InputAudioRawFrame,
UserStoppedSpeakingFrame,
BotStartedSpeakingFrame,
STTMuteFrame, # mute=True
BotStoppedSpeakingFrame,
STTMuteFrame, # mute=False
UserStartedSpeakingFrame,
InputAudioRawFrame,
UserStoppedSpeakingFrame,
]

View File

@@ -0,0 +1,34 @@
#
# Copyright (c) 2024-2025 Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import unittest
from pipecat.utils.network import exponential_backoff_time
class TestUtilsNetwork(unittest.IsolatedAsyncioTestCase):
async def test_exponential_backoff_time(self):
# min_wait=4, max_wait=10, multiplier=1
assert exponential_backoff_time(attempt=1, min_wait=4, max_wait=10, multiplier=1) == 4
assert exponential_backoff_time(attempt=2, min_wait=4, max_wait=10, multiplier=1) == 4
assert exponential_backoff_time(attempt=3, min_wait=4, max_wait=10, multiplier=1) == 4
assert exponential_backoff_time(attempt=4, min_wait=4, max_wait=10, multiplier=1) == 8
assert exponential_backoff_time(attempt=5, min_wait=4, max_wait=10, multiplier=1) == 10
assert exponential_backoff_time(attempt=6, min_wait=4, max_wait=10, multiplier=1) == 10
# min_wait=1, max_wait=10, multiplier=1
assert exponential_backoff_time(attempt=1, min_wait=1, max_wait=10, multiplier=1) == 1
assert exponential_backoff_time(attempt=2, min_wait=1, max_wait=10, multiplier=1) == 2
assert exponential_backoff_time(attempt=3, min_wait=1, max_wait=10, multiplier=1) == 4
assert exponential_backoff_time(attempt=4, min_wait=1, max_wait=10, multiplier=1) == 8
assert exponential_backoff_time(attempt=5, min_wait=1, max_wait=10, multiplier=1) == 10
assert exponential_backoff_time(attempt=6, min_wait=1, max_wait=10, multiplier=1) == 10
# min_wait=1, max_wait=20, multiplier=2
assert exponential_backoff_time(attempt=1, min_wait=1, max_wait=20, multiplier=2) == 2
assert exponential_backoff_time(attempt=2, min_wait=1, max_wait=20, multiplier=2) == 4
assert exponential_backoff_time(attempt=3, min_wait=1, max_wait=20, multiplier=2) == 8
assert exponential_backoff_time(attempt=4, min_wait=1, max_wait=20, multiplier=2) == 16
assert exponential_backoff_time(attempt=5, min_wait=1, max_wait=20, multiplier=2) == 20
assert exponential_backoff_time(attempt=6, min_wait=1, max_wait=20, multiplier=2) == 20