Compare commits

..

1 Commits

Author SHA1 Message Date
James Hush
f039ece2c0 feat: nova-3 example 2025-02-18 11:24:02 +08:00
214 changed files with 1395 additions and 16025 deletions

15
.gitignore vendored
View File

@@ -32,21 +32,6 @@ fly.toml
# Example files
pipecat/examples/twilio-chatbot/templates/streams.xml
pipecat/examples/bot-ready-signalling/client/react-native/node_modules/
pipecat/examples/bot-ready-signalling/client/react-native/.expo/
pipecat/examples/bot-ready-signalling/client/react-native/dist/
pipecat/examples/bot-ready-signalling/client/react-native/npm-debug.*
pipecat/examples/bot-ready-signalling/client/react-native/*.jks
pipecat/examples/bot-ready-signalling/client/react-native/*.p8
pipecat/examples/bot-ready-signalling/client/react-native/*.p12
pipecat/examples/bot-ready-signalling/client/react-native/*.key
pipecat/examples/bot-ready-signalling/client/react-native/*.mobileprovision
pipecat/examples/bot-ready-signalling/client/react-native/*.orig.*
pipecat/examples/bot-ready-signalling/client/react-native/web-build/
# macOS
.DS_Store
# Documentation
docs/api/_build/

View File

@@ -1,8 +1,7 @@
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.9.7
- repo: local
hooks:
- id: ruff
language_version: python3
args: [ --select, I, ]
- id: ruff-format
- id: ruff-format-hook
name: Check ruff formatting
entry: sh scripts/pre-commit.sh
language: system

View File

@@ -5,214 +5,6 @@ All notable changes to **Pipecat** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
### Added
- Added a `flush_audio()` method to `AzureTTSService`, `FishTTSService`,
`PlayHTTTSService`, and `LMNTTTSService`.
- Added `set_language()` and `set_model()` to `AzureSTTService`,
`AssemblySTTService`, and `GladiaSTTService`.
- Added `on_user_turn_audio_data` and `on_bot_turn_audio_data` to
`AudioBufferProcessor`. This gives the ability to grab the audio of only that
turn for both the user and the bot.
- Added new base class `BaseObject` which is now the base class of
`FrameProcessor`, `PipelineRunner`, `PipelineTask` and `BaseTransport`. The
new `BaseObject` adds supports for event handlers.
- Added support for a unified format for specifying function calling across all
LLM services.
```python
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location"],
)
tools = ToolsSchema(standard_tools=[weather_function])
```
- Added `speech_threshold` parameter to `GladiaSTTService`.
- Allow passing user (`user_kwargs`) and assistant (`assistant_kwargs`) context
aggregator parameters when using `create_context_aggregator()`. The values are
passed as a mapping that will then be converted to arguments.
- Added `speed` as an `InputParam` for both `ElevenLabsTTSService` and
`ElevenLabsHttpTTSService`.
- Added new `LLMFullResponseAggregator` to aggregate full LLM completions. At
every completion the `on_completion` event handler is triggered.
- Added a new frame, `RTVIServerMessageFrame`, and RTVI message
`RTVIServerMessage` which provides a generic mechanism for sending custom
messages from server to client. The `RTVIServerMessageFrame` is processed by
the `RTVIObserver` and will be delivered to the client's `onServerMessage`
callback or `ServerMessage` event.
- Added `GoogleLLMOpenAIBetaService` for Google LLM integration with an
OpenAI-compatible interface. Added foundational example
`14o-function-calling-gemini-openai-format.py`.
- Added `AzureRealtimeBetaLLMService` to support Azure's OpeanAI Realtime API. Added
foundational example `19a-azure-realtime-beta.py`.
### Changed
- Moved `flush_audio()` from the `TTSService` base class to
`WebsocketTTSService`.
### Fixed
- Fixed an issue in `GoogleSTTService`, where it didn't have a `set_language`
function. This required a name change from `set_languages` to `set_language`.
## [0.0.58] - 2025-02-26
### Added
- Added track-specific audio event `on_track_audio_data` to
`AudioBufferProcessor` for accessing separate input and output audio tracks.
- Pipecat version will now be logged on every application startup. This will
help us identify what version we are running in case of any issues.
- Added a new `StopFrame` which can be used to stop a pipeline task while
keeping the frame processors running. The frame processors could then be used
in a different pipeline. The difference between a `StopFrame` and a
`StopTaskFrame` is that, as with `EndFrame` and `EndTaskFrame`, the
`StopFrame` is pushed from the task and the `StopTaskFrame` is pushed upstream
inside the pipeline by any processor.
- Added a new `PipelineTask` parameter `observers` that replaces the previous
`PipelineParams.observers`.
- Added a new `PipelineTask` parameter `check_dangling_tasks` to enable or
disable checking for frame processors' dangling tasks when the Pipeline
finishes running.
- Added new `on_completion_timeout` event for LLM services (all OpenAI-based
services, Anthropic and Google). Note that this event will only get triggered
if LLM timeouts are setup and if the timeout was reached. It can be useful to
retrigger another completion and see if the timeout was just a blip.
- Added new log observers `LLMLogObserver` and `TranscriptionLogObserver` that
can be useful for debugging your pipelines.
- Added `room_url` property to `DailyTransport`.
- Added `addons` argument to `DeepgramSTTService`.
- Added `exponential_backoff_time()` to `utils.network` module.
### Changed
- ⚠️ `PipelineTask` now requires keyword arguments (except for the first one for
the pipeline).
- Updated `PlayHTHttpTTSService` to take a `voice_engine` and `protocol` input
in the constructor. The previous method of providing a `voice_engine` input
that contains the engine and protocol is deprecated by PlayHT.
- The base `TTSService` class now strips leading newlines before sending text
to the TTS provider. This change is to solve issues where some TTS providers,
like Azure, would not output text due to newlines.
- `GrokLLMSService` now uses `grok-2` as the default model.
- `AnthropicLLMService` now uses `claude-3-7-sonnet-20250219` as the default
model.
- `RimeHttpTTSService` needs an `aiohttp.ClientSession` to be passed to the
constructor as all the other HTTP-based services.
- `RimeHttpTTSService` doesn't use a default voice anymore.
- `DeepgramSTTService` now uses the new `nova-3` model by default. If you want
to use the previous model you can pass `LiveOptions(model="nova-2-general")`.
(see https://deepgram.com/learn/introducing-nova-3-speech-to-text-api)
```python
stt = DeepgramSTTService(..., live_options=LiveOptions(model="nova-2-general"))
```
### Deprecated
- `PipelineParams.observers` is now deprecated, you the new `PipelineTask`
parameter `observers`.
### Removed
- Remove `TransportParams.audio_out_is_live` since it was not being used at all.
### Fixed
- Fixed a `GoogleLLMService` that was causing an exception when sending inline
audio in some cases.
- Fixed an `AudioContextWordTTSService` issue that would cause an `EndFrame` to
disconnect from the TTS service before audio from all the contexts was
received. This affected services like Cartesia and Rime.
- Fixed an issue that was not allowing to pass an `OpenAILLMContext` to create
`GoogleLLMService`'s context aggregators.
- Fixed a `ElevenLabsTTSService`, `FishAudioTTSService`, `LMNTTTSService` and
`PlayHTTTSService` issue that was resulting in audio requested before an
interruption being played after an interruption.
- Fixed `match_endofsentence` support for ellipses.
- Fixed an issue that would cause undesired interruptions via
`EmulateUserStartedSpeakingFrame` when only interim transcriptions (i.e. no
final transcriptions) where received.
- Fixed an issue where `EndTaskFrame` was not triggering
`on_client_disconnected` or closing the WebSocket in FastAPI.
- Fixed an issue in `DeepgramSTTService` where the `sample_rate` passed to the
`LiveOptions` was not being used, causing the service to use the default
sample rate of pipeline.
- Fixed a context aggregator issue that would not append the LLM text response
to the context if a function call happened in the same LLM turn.
- Fixed an issue that was causing HTTP TTS services to push `TTSStoppedFrame`
more than once.
- Fixed a `FishAudioTTSService` issue where `TTSStoppedFrame` was not being
pushed.
- Fixed an issue that `start_callback` was not invoked for some LLM services.
- Fixed an issue that would cause `DeepgramSTTService` to stop working after an
error occurred (e.g. sudden network loss). If the network recovered we would
not reconnect.
- Fixed a `STTMuteFilter` issue that would not mute user audio frames causing
transcriptions to be generated by the STT service.
### Other
- Added Gemini support to `examples/phone-chatbot`.
- Added foundational example `34-audio-recording.py` showing how to use the
AudioBufferProcessor callbacks to save merged and track recordings.
## [0.0.57] - 2025-02-14
### Added

View File

@@ -3,10 +3,10 @@ coverage~=7.6.12
grpcio-tools~=1.67.1
pip-tools~=7.4.1
pre-commit~=4.0.1
pyright~=1.1.394
pyright~=1.1.393
pytest~=8.3.4
pytest-asyncio~=0.25.3
ruff~=0.9.7
pytest-asyncio~=0.25.2
ruff~=0.9.5
setuptools~=70.0.0
setuptools_scm~=8.1.0
python-dotenv~=1.0.1

View File

@@ -18,9 +18,6 @@ AZURE_DALLE_API_KEY=...
AZURE_DALLE_ENDPOINT=https://...
AZURE_DALLE_MODEL=...
# Cartesia
CARTESIA_API_KEY=...
# Daily
DAILY_API_KEY=...
DAILY_SAMPLE_ROOM_URL=https://...

View File

@@ -1,60 +0,0 @@
# React Native Implementation
Basic implementation using the [Pipecat React Native SDK](https://docs.pipecat.ai/client/react-native/introduction).
## Usage
### Expo requirements
This project cannot be used with an [Expo Go](https://docs.expo.dev/workflow/expo-go/) app because [it requires custom native code](https://docs.expo.io/workflow/customizing/).
When a project requires custom native code or a config plugin, we need to transition from using [Expo Go](https://docs.expo.dev/workflow/expo-go/)
to a [development build](https://docs.expo.dev/development/introduction/).
More details about the custom native code used by this demo can be found in [rn-daily-js-expo-config-plugin](https://github.com/daily-co/rn-daily-js-expo-config-plugin).
### Building remotely
If you do not have experience with Xcode and Android Studio builds or do not have them installed locally on your computer, you will need to follow [this guide from Expo to use EAS Build](https://docs.expo.dev/development/create-development-builds/#create-and-install-eas-build).
### Building locally
You will need to have installed locally on your computer:
- [Xcode](https://developer.apple.com/xcode/) to build for iOS;
- [Android Studio](https://developer.android.com/studio) to build for Android;
#### Install the demo dependencies
```bash
# Use the version of node specified in .nvmrc
nvm i
# Install dependencies
npm i
# Before a native app can be compiled, the native source code must be generated.
npx expo prebuild
# Configure the environment variable to connect to the local server
cp env.example .env
# edit .env and add your local ip address, for example: http://192.168.1.16:7860
```
#### Running on Android
After plugging in an Android device [configured for debugging](https://developer.android.com/studio/debug/dev-options), run the following command:
```
npm run android
```
#### Running on iOS
Run the following command:
```
npm run ios
```
#### Connect to the server
Use the http://localhost:5173 in your app.

View File

@@ -1,75 +0,0 @@
{
"expo": {
"name": "bot-ready-rn",
"slug": "bot-ready-rn",
"version": "1.0.0",
"orientation": "portrait",
"icon": "./assets/icon.png",
"userInterfaceStyle": "light",
"splash": {
"image": "./assets/splash.png",
"resizeMode": "contain",
"backgroundColor": "#ffffff"
},
"updates": {
"fallbackToCacheTimeout": 0
},
"assetBundlePatterns": [
"**/*"
],
"ios": {
"supportsTablet": true,
"bitcode": false,
"bundleIdentifier": "co.daily.expo.BotReady",
"infoPlist": {
"UIBackgroundModes": [
"voip"
]
},
"appleTeamId": "EEBGKV9N3N"
},
"android": {
"adaptiveIcon": {
"foregroundImage": "./assets/adaptive-icon.png",
"backgroundColor": "#FFFFFF"
},
"package": "co.daily.expo.BotReady",
"permissions": [
"android.permission.ACCESS_NETWORK_STATE",
"android.permission.BLUETOOTH",
"android.permission.CAMERA",
"android.permission.INTERNET",
"android.permission.MODIFY_AUDIO_SETTINGS",
"android.permission.RECORD_AUDIO",
"android.permission.SYSTEM_ALERT_WINDOW",
"android.permission.WAKE_LOCK",
"android.permission.FOREGROUND_SERVICE",
"android.permission.FOREGROUND_SERVICE_CAMERA",
"android.permission.FOREGROUND_SERVICE_MICROPHONE",
"android.permission.FOREGROUND_SERVICE_MEDIA_PROJECTION",
"android.permission.POST_NOTIFICATIONS"
]
},
"web": {
"favicon": "./assets/favicon.png"
},
"plugins": [
"@config-plugins/react-native-webrtc",
"@daily-co/config-plugin-rn-daily-js",
[
"expo-build-properties",
{
"android": {
"minSdkVersion": 24,
"compileSdkVersion": 35,
"targetSdkVersion": 34,
"buildToolsVersion": "35.0.0"
},
"ios": {
"deploymentTarget": "15.1"
}
}
]
]
}
}

Binary file not shown.

Before

Width:  |  Height:  |  Size: 17 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 1.4 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 22 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 46 KiB

View File

@@ -1,7 +0,0 @@
module.exports = function(api) {
api.cache(true);
return {
presets: ['babel-preset-expo'],
plugins: [["module:react-native-dotenv"]],
};
};

View File

@@ -1 +0,0 @@
API_BASE_URL=http://YOUR_LOCAL_IP:7860

View File

@@ -1,7 +0,0 @@
import { registerRootComponent } from "expo";
import App from "./src/App";
// registerRootComponent calls AppRegistry.registerComponent('main', () => App);
// It also ensures that the environment is set up appropriately
registerRootComponent(App);

View File

@@ -1,4 +0,0 @@
// Learn more https://docs.expo.io/guides/customizing-metro
const { getDefaultConfig } = require('expo/metro-config');
module.exports = getDefaultConfig(__dirname);

File diff suppressed because it is too large Load Diff

View File

@@ -1,31 +0,0 @@
{
"name": "bot-ready-rn",
"version": "1.0.0",
"scripts": {
"start": "expo start --dev-client",
"android": "expo run:android --device",
"ios": "expo run:ios --device",
"web": "expo start --web"
},
"dependencies": {
"@config-plugins/react-native-webrtc": "^10.0.0",
"@daily-co/config-plugin-rn-daily-js": "0.0.7",
"@daily-co/react-native-daily-js": "^0.70.0",
"@daily-co/react-native-webrtc": "^118.0.3-daily.2",
"@react-native-async-storage/async-storage": "1.23.1",
"expo": "^52.0.0",
"expo-build-properties": "~0.13.1",
"expo-dev-client": "~5.0.5",
"expo-splash-screen": "~0.29.16",
"expo-status-bar": "~2.0.0",
"react": "18.3.1",
"react-native": "0.76.3",
"react-native-background-timer": "^2.4.1",
"react-native-dotenv": "^3.4.11",
"react-native-get-random-values": "^1.11.0"
},
"devDependencies": {
"@babel/core": "^7.12.9"
},
"private": true
}

View File

@@ -1,121 +0,0 @@
import React, { useState, useEffect } from 'react';
import {SafeAreaView, View, Text, Button, StyleSheet, ScrollView} from 'react-native';
import Daily from "@daily-co/react-native-daily-js";
import { API_BASE_URL } from "@env";
const CallScreen = () => {
const [connectionStatus, setConnectionStatus] = useState('Disconnected');
const [isConnected, setIsConnected] = useState(false);
const [callObject, setCallObject] = useState(null);
const [logs, setLogs] = useState([]);
useEffect(() => {
if (callObject) {
setupTrackListeners(callObject);
}
}, [callObject]);
const log = (message) => {
setLogs((prevLogs) => [...prevLogs, `${new Date().toISOString()} - ${message}`]);
console.log(message);
};
const setupTrackListeners = (callObject) => {
callObject.on("joined-meeting", () => {
setConnectionStatus('Connected');
setIsConnected(true);
log('Client connected');
});
callObject.on("left-meeting", () => {
setConnectionStatus('Disconnected');
setIsConnected(false);
log('Client disconnected');
});
callObject.on("participant-left", () => {
// When the bot leaves, we are also disconnecting from the call
disconnect().catch((err) => {
log(`Failed to disconnect ${err}`);
})
});
// Trigger so the bot can start sending audio
callObject.on("track-started", (evt) => {
if (evt.track.kind === "audio" && evt.participant.local === false) {
handleEventToConsole(evt)
log("Sending the message that will trigger the bot to play the audio.")
callObject.sendAppMessage("playable")
}
});
callObject.on("error", (evt) => log(`Error: ${evt.error}`));
// Other events just for awareness
callObject.on("track-stopped", handleEventToConsole);
callObject.on("participant-joined", handleEventToConsole);
callObject.on("participant-updated", handleEventToConsole);
};
const handleEventToConsole = (evt) => {
log(`Received event: ${evt.action}`);
};
const connect = async () => {
try {
const callObject = Daily.createCallObject({ subscribeToTracksAutomatically: true });
setCallObject(callObject);
const connectionUrl = `${API_BASE_URL}/connect`
const res = await fetch(connectionUrl, { method: "POST", headers: { "Content-Type": "application/json" } });
const roomInfo = await res.json();
await callObject.join({ url: roomInfo.room_url });
} catch (error) {
log(`Error connecting: ${error.message}`);
}
};
const disconnect = async () => {
if (callObject) {
try {
await callObject.leave();
await callObject.destroy();
setCallObject(null);
} catch (error) {
log(`Error disconnecting: ${error.message}`);
}
}
};
return (
<SafeAreaView style={styles.safeArea}>
<View style={styles.container}>
<View style={styles.statusBar}>
<Text>Status: <Text style={styles.status}>{connectionStatus}</Text></Text>
<View style={styles.controls}>
<Button
title={isConnected ? "Disconnect" : "Connect"}
onPress={isConnected ? disconnect : connect}
/>
</View>
</View>
<View style={styles.debugPanel}>
<Text style={styles.debugTitle}>Debug Info</Text>
<ScrollView style={styles.debugLog}>
{logs.map((logEntry, index) => (
<Text key={index} style={styles.logText}>{logEntry}</Text>
))}
</ScrollView>
</View>
</View>
</SafeAreaView>
);
};
const styles = StyleSheet.create({
safeArea: { flex: 1, backgroundColor: '#f0f0f0', padding: 20 },
container: { flex: 1, margin: 20 },
statusBar: { flexDirection: 'row', justifyContent: 'space-between', alignItems: 'center', padding: 10, backgroundColor: '#fff', borderRadius: 8, marginBottom: 20 },
status: { fontWeight: 'bold' },
controls: { flexDirection: 'row', gap: 10 },
debugPanel: { height: '80%', backgroundColor: '#fff', borderRadius: 8, padding: 20},
debugTitle: { fontSize: 16, fontWeight: 'bold' },
debugLog: { height: '100%', overflow: 'scroll', backgroundColor: '#f8f8f8', padding: 10, borderRadius: 4, fontFamily: 'monospace', fontSize: 12, lineHeight: 1.4 },
});
export default CallScreen;

View File

@@ -17,7 +17,7 @@ from runner import configure
from pipecat.frames.frames import AudioRawFrame, EndFrame, OutputAudioRawFrame, TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport

View File

@@ -113,13 +113,13 @@ async def main():
llm,
tts,
transport.output(),
canonical, # uploads audio buffer to Canonical AI for metrics
audio_buffer_processor, # captures audio into a buffer
canonical, # uploads audio buffer to Canonical AI for metrics
context_aggregator.assistant(),
]
)
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):

View File

@@ -32,16 +32,10 @@ load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
# Create the recordings directory if it doesn't exist
os.makedirs("recordings", exist_ok=True)
async def save_audio(audio: bytes, sample_rate: int, num_channels: int, name: str):
async def save_audio(audio: bytes, sample_rate: int, num_channels: int):
if len(audio) > 0:
filename = os.path.join(
"recordings",
f"{name}_conversation_recording{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.wav",
)
filename = f"conversation_recording{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.wav"
with io.BytesIO() as buffer:
with wave.open(buffer, "wb") as wf:
wf.setsampwidth(2)
@@ -116,7 +110,7 @@ async def main():
# NOTE: Watch out! This will save all the conversation in memory. You
# can pass `buffer_size` to get periodic callbacks.
audiobuffer = AudioBufferProcessor(enable_turn_audio=True)
audiobuffer = AudioBufferProcessor()
pipeline = Pipeline(
[
@@ -130,19 +124,11 @@ async def main():
]
)
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@audiobuffer.event_handler("on_audio_data")
async def on_audio_data(buffer, audio, sample_rate, num_channels):
await save_audio(audio, sample_rate, num_channels, "full")
@audiobuffer.event_handler("on_user_turn_audio_data")
async def on_user_turn_audio_data(buffer, audio, sample_rate, num_channels):
await save_audio(audio, sample_rate, num_channels, "user")
@audiobuffer.event_handler("on_bot_turn_audio_data")
async def on_bot_turn_audio_data(buffer, audio, sample_rate, num_channels):
await save_audio(audio, sample_rate, num_channels, "bot")
await save_audio(audio, sample_rate, num_channels)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):

View File

@@ -70,7 +70,7 @@ async def main(room_url: str, token: str):
]
)
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):

View File

@@ -62,7 +62,7 @@ async def main(room_url: str, token: str):
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -18,7 +18,8 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.services.fal import FalImageGenService
from pipecat.transports.local.tk import TkLocalTransport, TkTransportParams
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.local.tk import TkLocalTransport
load_dotenv(override=True)
@@ -33,9 +34,7 @@ async def main():
transport = TkLocalTransport(
tk_root,
TkTransportParams(
camera_out_enabled=True, camera_out_width=1024, camera_out_height=1024
),
TransportParams(camera_out_enabled=True, camera_out_width=1024, camera_out_height=1024),
)
imagegen = FalImageGenService(

View File

@@ -44,8 +44,7 @@ async def main():
runner = PipelineRunner()
task = PipelineTask(
Pipeline([imagegen, transport.output()]),
params=PipelineParams(enable_metrics=True),
Pipeline([imagegen, transport.output()]), PipelineParams(enable_metrics=True)
)
@transport.event_handler("on_first_participant_joined")

View File

@@ -30,7 +30,8 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.cartesia import CartesiaHttpTTSService
from pipecat.services.fal import FalImageGenService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.local.tk import TkLocalTransport, TkTransportParams
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.local.tk import TkLocalTransport, TkOutputTransport
load_dotenv(override=True)
@@ -151,7 +152,7 @@ async def main():
transport = TkLocalTransport(
tk_root,
TkTransportParams(
TransportParams(
audio_out_enabled=True,
camera_out_enabled=True,
camera_out_width=1024,

View File

@@ -105,10 +105,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
PipelineParams(enable_metrics=True, enable_usage_metrics=True),
)
@transport.event_handler("on_first_participant_joined")

View File

@@ -127,7 +127,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -76,7 +76,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -74,7 +74,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -79,7 +79,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -103,7 +103,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -81,7 +81,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -9,6 +9,7 @@ import os
import sys
import aiohttp
from deepgram import LiveOptions
from dotenv import load_dotenv
from loguru import logger
from runner import configure
@@ -44,7 +45,23 @@ async def main():
),
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
# stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
stt = DeepgramSTTService(
api_key=os.getenv("DEEPGRAM_API_KEY"),
# url=deepgram_url,
live_options=LiveOptions(
encoding="linear16",
language="en-US",
model="nova-3",
channels=1,
interim_results=True,
# smart_format=smart_format,
# endpointing=endpointing,
vad_events=True,
diarize=True,
filler_words=True,
),
)
tts = DeepgramTTSService(api_key=os.getenv("DEEPGRAM_API_KEY"), voice="aura-helios-en")
@@ -74,7 +91,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -1,103 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.elevenlabs import ElevenLabsHttpTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = ElevenLabsHttpTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY", ""),
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
aiohttp_session=session,
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.cancel()
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -74,7 +74,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -75,7 +75,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -77,7 +77,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -83,7 +83,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -81,7 +81,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -81,7 +81,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -75,7 +75,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -21,7 +21,6 @@ from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.gladia import GladiaSTTService
from pipecat.services.openai import OpenAILLMService
from pipecat.transcriptions.language import Language
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
@@ -81,7 +80,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -71,7 +71,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -88,7 +88,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -81,7 +81,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -79,7 +79,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -80,7 +80,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -76,7 +76,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -1,103 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.openai import OpenAILLMService
from pipecat.services.rime import RimeHttpTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = RimeHttpTTSService(
api_key=os.getenv("RIME_API_KEY", ""),
voice_id="rex",
aiohttp_session=session,
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.cancel()
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -74,7 +74,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -74,7 +74,7 @@ async def main():
]
)
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):

View File

@@ -251,7 +251,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -74,7 +74,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -78,11 +78,7 @@ async def main():
runner = PipelineRunner()
task = PipelineTask(
pipeline,
params=PipelineParams(
audio_in_sample_rate=24000,
audio_out_sample_rate=24000,
),
pipeline, PipelineParams(audio_in_sample_rate=24000, audio_out_sample_rate=24000)
)
await runner.run(task)

View File

@@ -24,7 +24,8 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.transports.local.tk import TkLocalTransport, TkTransportParams
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.local.tk import TkLocalTransport
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
@@ -66,7 +67,7 @@ async def main():
tk_transport = TkLocalTransport(
tk_root,
TkTransportParams(
TransportParams(
audio_out_enabled=True,
camera_out_enabled=True,
camera_out_is_live=True,
@@ -82,11 +83,7 @@ async def main():
pipeline = Pipeline([daily_transport.input(), MirrorProcessor(), tk_transport.output()])
task = PipelineTask(
pipeline,
params=PipelineParams(
audio_in_sample_rate=24000,
audio_out_sample_rate=24000,
),
pipeline, PipelineParams(audio_in_sample_rate=24000, audio_out_sample_rate=24000)
)
async def run_tk():

View File

@@ -76,7 +76,7 @@ async def main():
]
)
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):

View File

@@ -11,10 +11,9 @@ import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from openai.types.chat import ChatCompletionToolParam
from runner import configure
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
@@ -66,24 +65,30 @@ async def main():
# sent to the same callback with an additional function_name parameter.
llm.register_function(None, fetch_weather_from_api, start_callback=start_fetch_weather)
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
tools = [
ChatCompletionToolParam(
type="function",
function={
"name": "get_current_weather",
"description": "Get the current weather",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
"required": ["location", "format"],
},
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location", "format"],
)
tools = ToolsSchema(standard_tools=[weather_function])
)
]
messages = [
{
"role": "system",
@@ -107,7 +112,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -13,8 +13,6 @@ from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
@@ -61,18 +59,22 @@ async def main():
)
llm.register_function("get_weather", get_weather)
weather_function = FunctionSchema(
name="get_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
tools = [
{
"name": "get_weather",
"description": "Get the current weather in a given location",
"input_schema": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
}
},
"required": ["location"],
},
},
required=["location"],
)
tools = ToolsSchema(standard_tools=[weather_function])
}
]
# todo: test with very short initial user message
@@ -97,13 +99,7 @@ async def main():
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
),
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True, enable_metrics=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):

View File

@@ -13,8 +13,6 @@ from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
@@ -74,29 +72,36 @@ async def main():
llm.register_function("get_weather", get_weather)
llm.register_function("get_image", get_image)
weather_function = FunctionSchema(
name="get_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
tools = [
{
"name": "get_weather",
"description": "Get the current weather in a given location",
"input_schema": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
}
},
"required": ["location"],
},
},
required=["location"],
)
get_image_function = FunctionSchema(
name="get_image",
description="Get an image from the video stream.",
properties={
"question": {
"type": "string",
"description": "The question that the user is asking about the image.",
}
{
"name": "get_image",
"description": "Get an image from the video stream.",
"input_schema": {
"type": "object",
"properties": {
"question": {
"type": "string",
"description": "The question that the user is asking about the image.",
}
},
"required": ["question"],
},
},
required=["question"],
)
tools = ToolsSchema(standard_tools=[weather_function, get_image_function])
]
# todo: test with very short initial user message
@@ -148,13 +153,7 @@ If you need to use a tool, simply use the tool. Do not tell the user the tool yo
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
),
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True, enable_metrics=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):

View File

@@ -11,10 +11,9 @@ import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from openai.types.chat import ChatCompletionToolParam
from runner import configure
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
@@ -70,23 +69,30 @@ async def main():
# sent to the same callback with an additional function_name parameter.
llm.register_function(None, fetch_weather_from_api, start_callback=start_fetch_weather)
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
tools = [
ChatCompletionToolParam(
type="function",
function={
"name": "get_current_weather",
"description": "Get the current weather",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
"required": ["location", "format"],
},
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location", "format"],
)
tools = ToolsSchema(standard_tools=[weather_function])
)
]
messages = [
{
"role": "system",

View File

@@ -11,10 +11,9 @@ import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from openai.types.chat import ChatCompletionToolParam
from runner import configure
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
@@ -67,34 +66,47 @@ async def main():
llm.register_function("get_weather", get_weather)
llm.register_function("get_image", get_image)
weather_function = FunctionSchema(
name="get_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
tools = [
ChatCompletionToolParam(
type="function",
function={
"name": "get_weather",
"description": "Get the current weather",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
"required": ["location", "format"],
},
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
),
ChatCompletionToolParam(
type="function",
function={
"name": "get_image",
"description": "Get an image from the video stream.",
"parameters": {
"type": "object",
"properties": {
"question": {
"type": "string",
"description": "The question to ask the AI to generate an image of",
},
},
"required": ["question"],
},
},
},
required=["location"],
)
get_image_function = FunctionSchema(
name="get_image",
description="Get an image from the video stream.",
properties={
"question": {
"type": "string",
"description": "The question that the user is asking about the image.",
}
},
required=["question"],
)
tools = ToolsSchema(standard_tools=[weather_function, get_image_function])
),
]
system_prompt = """\
You are a helpful assistant who converses with a user and answers questions. Respond concisely to general questions.

View File

@@ -13,10 +13,7 @@ from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -33,12 +30,6 @@ logger.add(sys.stderr, level="DEBUG")
video_participant_id = None
async def start_fetch_weather(function_name, llm, context):
"""Push a frame to the LLM; this is handy when the LLM response might take a while."""
await llm.push_frame(TTSSpeakFrame("Let me check on that."))
logger.debug(f"Starting fetch_weather_from_api with function_name: {function_name}")
async def get_weather(function_name, tool_call_id, arguments, llm, context, result_callback):
location = arguments["location"]
await result_callback(f"The weather in {location} is currently 72 degrees and sunny.")
@@ -72,37 +63,48 @@ async def main():
)
llm = GoogleLLMService(api_key=os.getenv("GOOGLE_API_KEY"), model="gemini-2.0-flash-001")
llm.register_function("get_weather", get_weather, start_fetch_weather)
llm.register_function("get_weather", get_weather)
llm.register_function("get_image", get_image)
weather_function = FunctionSchema(
name="get_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location", "format"],
)
get_image_function = FunctionSchema(
name="get_image",
description="Get an image from the video stream.",
properties={
"question": {
"type": "string",
"description": "The question that the user is asking about the image.",
}
},
required=["question"],
)
tools = ToolsSchema(standard_tools=[weather_function, get_image_function])
tools = [
{
"function_declarations": [
{
"name": "get_weather",
"description": "Get the current weather",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
"required": ["location", "format"],
},
},
{
"name": "get_image",
"description": "Get and image from the camera or video stream.",
"parameters": {
"type": "object",
"properties": {
"question": {
"type": "string",
"description": "The question to to use when running inference on the acquired image.",
},
},
"required": ["question"],
},
},
]
}
]
system_prompt = """\
You are a helpful assistant who converses with a user and answers questions. Respond concisely to general questions.
@@ -143,7 +145,7 @@ indicate you should use the get_image tool are:
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -11,10 +11,9 @@ import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from openai.types.chat import ChatCompletionToolParam
from runner import configure
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
@@ -69,23 +68,30 @@ async def main():
# sent to the same callback with an additional function_name parameter.
llm.register_function(None, fetch_weather_from_api, start_callback=start_fetch_weather)
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
tools = [
ChatCompletionToolParam(
type="function",
function={
"name": "get_current_weather",
"description": "Get the current weather",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"unit": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
"required": ["location"],
},
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location"],
)
tools = ToolsSchema(standard_tools=[weather_function])
)
]
messages = [
{
"role": "system",
@@ -110,7 +116,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -11,10 +11,9 @@ import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from openai.types.chat import ChatCompletionToolParam
from runner import configure
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
@@ -67,23 +66,30 @@ async def main():
# sent to the same callback with an additional function_name parameter.
llm.register_function(None, fetch_weather_from_api, start_callback=start_fetch_weather)
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
tools = [
ChatCompletionToolParam(
type="function",
function={
"name": "get_current_weather",
"description": "Get the current weather",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
"required": ["location", "format"],
},
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location", "format"],
)
tools = ToolsSchema(standard_tools=[weather_function])
)
]
messages = [
{
"role": "system",
@@ -107,7 +113,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -11,10 +11,9 @@ import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from openai.types.chat import ChatCompletionToolParam
from runner import configure
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
@@ -71,23 +70,30 @@ async def main():
# sent to the same callback with an additional function_name parameter.
llm.register_function(None, fetch_weather_from_api, start_callback=start_fetch_weather)
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
tools = [
ChatCompletionToolParam(
type="function",
function={
"name": "get_current_weather",
"description": "Get the current weather",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
"required": ["location", "format"],
},
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location", "format"],
)
tools = ToolsSchema(standard_tools=[weather_function])
)
]
messages = [
{
"role": "system",
@@ -111,7 +117,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -11,10 +11,9 @@ import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from openai.types.chat import ChatCompletionToolParam
from runner import configure
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
@@ -70,23 +69,30 @@ async def main():
# sent to the same callback with an additional function_name parameter.
llm.register_function(None, fetch_weather_from_api, start_callback=start_fetch_weather)
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
tools = [
ChatCompletionToolParam(
type="function",
function={
"name": "get_current_weather",
"description": "Get the current weather",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
"required": ["location", "format"],
},
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location", "format"],
)
tools = ToolsSchema(standard_tools=[weather_function])
)
]
messages = [
{
"role": "system",
@@ -110,7 +116,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -11,10 +11,9 @@ import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from openai.types.chat import ChatCompletionToolParam
from runner import configure
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
@@ -70,23 +69,30 @@ async def main():
# sent to the same callback with an additional function_name parameter.
llm.register_function(None, fetch_weather_from_api, start_callback=start_fetch_weather)
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
tools = [
ChatCompletionToolParam(
type="function",
function={
"name": "get_current_weather",
"description": "Returns the current weather at a location, if one is specified, and defaults to the user's location.",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The location to find the weather of, or if not provided, it's the default location.",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "Whether to use SI or USCS units (celsius or fahrenheit).",
},
},
"required": ["location", "format"],
},
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location", "format"],
)
tools = ToolsSchema(standard_tools=[weather_function])
)
]
messages = [
{
"role": "system",
@@ -110,7 +116,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -11,10 +11,9 @@ import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from openai.types.chat import ChatCompletionToolParam
from runner import configure
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
@@ -67,23 +66,30 @@ async def main():
# sent to the same callback with an additional function_name parameter.
llm.register_function(None, fetch_weather_from_api, start_callback=start_fetch_weather)
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
tools = [
ChatCompletionToolParam(
type="function",
function={
"name": "get_current_weather",
"description": "Get the current weather for a specific location. You MUST use this function whenever asked about weather.",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Use fahrenheit for US locations, celsius for others.",
},
},
"required": ["location", "format"],
},
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location", "format"],
)
tools = ToolsSchema(standard_tools=[weather_function])
)
]
messages = [
{
"role": "system",
@@ -117,7 +123,7 @@ Start by asking me for my location. Then, use 'get_weather_current' to give me a
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -11,10 +11,9 @@ import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from openai.types.chat import ChatCompletionToolParam
from runner import configure
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
@@ -67,23 +66,30 @@ async def main():
# sent to the same callback with an additional function_name parameter.
llm.register_function(None, fetch_weather_from_api, start_callback=start_fetch_weather)
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
tools = [
ChatCompletionToolParam(
type="function",
function={
"name": "get_current_weather",
"description": "Get the current weather for a specific location. You MUST use this function whenever asked about weather.",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Use fahrenheit for US locations, celsius for others.",
},
},
"required": ["location", "format"],
},
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location", "format"],
)
tools = ToolsSchema(standard_tools=[weather_function])
)
]
messages = [
{
"role": "system",
@@ -117,7 +123,7 @@ Start by asking me for my location. Then, use 'get_weather_current' to give me a
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -11,10 +11,9 @@ import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from openai.types.chat import ChatCompletionToolParam
from runner import configure
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
@@ -71,23 +70,30 @@ async def main():
# sent to the same callback with an additional function_name parameter.
llm.register_function(None, fetch_weather_from_api, start_callback=start_fetch_weather)
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
tools = [
ChatCompletionToolParam(
type="function",
function={
"name": "get_current_weather",
"description": "Get the current weather",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
"required": ["location", "format"],
},
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location", "format"],
)
tools = ToolsSchema(standard_tools=[weather_function])
)
]
messages = [
{
"role": "system",
@@ -111,7 +117,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -83,7 +83,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -1,131 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.google import GoogleLLMOpenAIBetaService
from pipecat.services.openai import OpenAILLMContext
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def start_fetch_weather(function_name, llm, context):
"""Push a frame to the LLM; this is handy when the LLM response might take a while."""
await llm.push_frame(TTSSpeakFrame("Let me check on that."))
logger.debug(f"Starting fetch_weather_from_api with function_name: {function_name}")
async def fetch_weather_from_api(function_name, tool_call_id, args, llm, context, result_callback):
await result_callback({"conditions": "nice", "temperature": "75"})
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = ElevenLabsTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY", ""),
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
)
llm = GoogleLLMOpenAIBetaService(api_key=os.getenv("GEMINI_API_KEY"))
# Register a function_name of None to get all functions
# sent to the same callback with an additional function_name parameter.
llm.register_function(
"get_current_weather", fetch_weather_from_api, start_callback=start_fetch_weather
)
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location", "format"],
)
tools = ToolsSchema(standard_tools=[weather_function])
messages = [
{
"role": "user",
"content": "Start a conversation with 'Hey there' to get the current weather.",
},
]
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(),
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
await task.queue_frames([context_aggregator.user().get_context_frame()])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -133,7 +133,7 @@ async def main():
]
)
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):

View File

@@ -126,7 +126,7 @@ async def main():
]
)
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):

View File

@@ -85,13 +85,7 @@ async def main():
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
),
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True, enable_metrics=True))
# When a participant joins, start transcription for that participant so the
# bot can "hear" and respond to them.

View File

@@ -108,7 +108,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
report_only_initial_ttfb=True,

View File

@@ -38,6 +38,7 @@ async def main():
"GStreamer",
DailyParams(
audio_out_enabled=True,
audio_out_is_live=True,
camera_out_enabled=True,
camera_out_width=1280,
camera_out_height=720,

View File

@@ -154,7 +154,7 @@ Remember, your responses should be short. Just one or two sentences, usually."""
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -1,179 +0,0 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
from datetime import datetime
import aiohttp
import websockets
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.openai_realtime_beta import (
AzureRealtimeBetaLLMService,
InputAudioTranscription,
SessionProperties,
TurnDetection,
)
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def fetch_weather_from_api(function_name, tool_call_id, args, llm, context, result_callback):
temperature = 75 if args["format"] == "fahrenheit" else 24
await result_callback(
{
"conditions": "nice",
"temperature": temperature,
"format": args["format"],
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
}
)
tools = [
{
"type": "function",
"name": "get_current_weather",
"description": "Get the current weather",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
"required": ["location", "format"],
},
}
]
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
transcription_enabled=False,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.8)),
vad_audio_passthrough=True,
),
)
session_properties = SessionProperties(
input_audio_transcription=InputAudioTranscription(),
# Set openai TurnDetection parameters. Not setting this at all will turn it
# on by default
# turn_detection=TurnDetection(silence_duration_ms=1000),
# Or set to False to disable openai turn detection and use transport VAD
# turn_detection=False,
# tools=tools,
instructions="""Your knowledge cutoff is 2023-10. You are a helpful and friendly AI.
Act like a human, but remember that you aren't a human and that you can't do human
things in the real world. Your voice and personality should be warm and engaging, with a lively and
playful tone.
If interacting in a non-English language, start by using the standard accent or dialect familiar to
the user. Talk quickly. You should always call a function if you can. Do not refer to these rules,
even if you're asked about them.
-
You are participating in a voice conversation. Keep your responses concise, short, and to the point
unless specifically asked to elaborate on a topic.
Remember, your responses should be short. Just one or two sentences, usually.""",
)
llm = AzureRealtimeBetaLLMService(
api_key=os.getenv("AZURE_REALTIME_API_KEY"),
base_url=os.getenv("AZURE_REALTIME_BASE_URL"),
session_properties=session_properties,
start_audio_paused=False,
)
# you can either register a single function for all function calls, or specific functions
# llm.register_function(None, fetch_weather_from_api)
llm.register_function("get_current_weather", fetch_weather_from_api)
# Create a standard OpenAI LLM context object using the normal messages format. The
# OpenAIRealtimeBetaLLMService will convert this internally to messages that the
# openai WebSocket API can understand.
context = OpenAILLMContext(
[{"role": "user", "content": "Say hello!"}],
# [{"role": "user", "content": [{"type": "text", "text": "Say hello!"}]}],
# [
# {
# "role": "user",
# "content": [
# {"type": "text", "text": "Say"},
# {"type": "text", "text": "yo what's up!"},
# ],
# }
# ],
tools,
)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(),
llm, # LLM
context_aggregator.assistant(),
transport.output(), # Transport bot output
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
# report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
await task.queue_frames([context_aggregator.user().get_context_frame()])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -212,7 +212,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -237,7 +237,7 @@ Remember, your responses should be short. Just one or two sentences, usually."""
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -209,7 +209,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -263,7 +263,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -87,7 +87,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
# We just use 16000 because that's what Tavus is expecting and
# we avoid resampling.
audio_in_sample_rate=16000,

View File

@@ -145,7 +145,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -138,7 +138,6 @@ class OutputGate(FrameProcessor):
self._gate_open = start_open
self._frames_buffer = []
self._notifier = notifier
self._gate_task = None
def close_gate(self):
self._gate_open = False
@@ -179,13 +178,10 @@ class OutputGate(FrameProcessor):
async def _start(self):
self._frames_buffer = []
if not self._gate_task:
self._gate_task = self.create_task(self._gate_task_handler())
self._gate_task = self.create_task(self._gate_task_handler())
async def _stop(self):
if self._gate_task:
await self.cancel_task(self._gate_task)
self._gate_task = None
await self.cancel_task(self._gate_task)
async def _gate_task_handler(self):
while True:
@@ -355,7 +351,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -342,7 +342,6 @@ class OutputGate(FrameProcessor):
self._gate_open = start_open
self._frames_buffer = []
self._notifier = notifier
self._gate_task = None
def close_gate(self):
self._gate_open = False
@@ -383,13 +382,10 @@ class OutputGate(FrameProcessor):
async def _start(self):
self._frames_buffer = []
if not self._gate_task:
self._gate_task = self.create_task(self._gate_task_handler())
self._gate_task = self.create_task(self._gate_task_handler())
async def _stop(self):
if self._gate_task:
await self.cancel_task(self._gate_task)
self._gate_task = None
await self.cancel_task(self._gate_task)
async def _gate_task_handler(self):
while True:
@@ -564,7 +560,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -23,9 +23,12 @@ from pipecat.frames.frames import (
FunctionCallInProgressFrame,
FunctionCallResultFrame,
InputAudioRawFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMMessagesFrame,
StartFrame,
StartInterruptionFrame,
StopInterruptionFrame,
SystemFrame,
TextFrame,
TranscriptionFrame,
@@ -36,7 +39,7 @@ from pipecat.pipeline.parallel_pipeline import ParallelPipeline
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import LLMAssistantResponseAggregator
from pipecat.processors.aggregators.llm_response import LLMResponseAggregator
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
OpenAILLMContextFrame,
@@ -388,7 +391,7 @@ class AudioAccumulator(FrameProcessor):
)
self._user_speaking = False
context = GoogleLLMContext()
context.add_audio_frames_message(audio_frames=self._audio_frames)
context.add_audio_frames_message(text="Audio follows", audio_frames=self._audio_frames)
await self.push_frame(OpenAILLMContextFrame(context=context))
elif isinstance(frame, InputAudioRawFrame):
# Append the audio frame to our buffer. Treat the buffer as a ring buffer, dropping the oldest
@@ -431,11 +434,7 @@ class CompletenessCheck(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, (EndFrame, CancelFrame)):
if self._idle_task:
await self.cancel_task(self._idle_task)
self._idle_task = None
elif isinstance(frame, UserStartedSpeakingFrame):
if isinstance(frame, UserStartedSpeakingFrame):
if self._idle_task:
await self.cancel_task(self._idle_task)
elif isinstance(frame, TextFrame) and frame.text.startswith("YES"):
@@ -477,11 +476,19 @@ class CompletenessCheck(FrameProcessor):
self._idle_task = None
class LLMAggregatorBuffer(LLMAssistantResponseAggregator):
class UserAggregatorBuffer(LLMResponseAggregator):
"""Buffers the output of the transcription LLM. Used by the bot output gate."""
def __init__(self, **kwargs):
super().__init__(expect_stripped_words=False)
super().__init__(
messages=None,
role=None,
start_frame=LLMFullResponseStartFrame,
end_frame=LLMFullResponseEndFrame,
accumulator_frame=TextFrame,
handle_interruptions=True,
expect_stripped_words=False,
)
self._transcription = ""
async def process_frame(self, frame: Frame, direction: FrameDirection):
@@ -539,7 +546,7 @@ class OutputGate(FrameProcessor):
self,
notifier: BaseNotifier,
context: OpenAILLMContext,
llm_transcription_buffer: LLMAggregatorBuffer,
user_transcription_buffer: "UserAggregatorBuffer",
**kwargs,
):
super().__init__(**kwargs)
@@ -547,8 +554,7 @@ class OutputGate(FrameProcessor):
self._frames_buffer = []
self._notifier = notifier
self._context = context
self._transcription_buffer = llm_transcription_buffer
self._gate_task = None
self._transcription_buffer = user_transcription_buffer
def close_gate(self):
self._gate_open = False
@@ -596,13 +602,10 @@ class OutputGate(FrameProcessor):
async def _start(self):
self._frames_buffer = []
if not self._gate_task:
self._gate_task = self.create_task(self._gate_task_handler())
self._gate_task = self.create_task(self._gate_task_handler())
async def _stop(self):
if self._gate_task:
await self.cancel_task(self._gate_task)
self._gate_task = None
await self.cancel_task(self._gate_task)
async def _gate_task_handler(self):
while True:
@@ -694,10 +697,10 @@ async def main():
conversation_audio_context_assembler = ConversationAudioContextAssembler(context=context)
llm_aggregator_buffer = LLMAggregatorBuffer()
user_aggregator_buffer = UserAggregatorBuffer()
bot_output_gate = OutputGate(
notifier=notifier, context=context, llm_transcription_buffer=llm_aggregator_buffer
notifier=notifier, context=context, user_transcription_buffer=user_aggregator_buffer
)
pipeline = Pipeline(
@@ -718,7 +721,7 @@ async def main():
],
[
tx_llm,
llm_aggregator_buffer,
user_aggregator_buffer,
],
)
],
@@ -737,7 +740,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -87,7 +87,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -122,7 +122,7 @@ async def main():
]
)
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):

View File

@@ -354,7 +354,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -63,7 +63,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -89,7 +89,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -14,8 +14,6 @@ from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import AdapterType, ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.pipeline.pipeline import Pipeline
@@ -43,6 +41,32 @@ async def fetch_weather_from_api(function_name, tool_call_id, args, llm, context
)
tools = [
{
"function_declarations": [
{
"name": "get_current_weather",
"description": "Get the current weather",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
"required": ["location", "format"],
},
},
]
}
]
system_instruction = """
You are a helpful assistant who can answer questions and use tools.
@@ -71,27 +95,6 @@ async def main():
),
)
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location", "format"],
)
search_tool = {"google_search": {}}
tools = ToolsSchema(
standard_tools=[weather_function], custom_tools={AdapterType.GEMINI: [search_tool]}
)
llm = GeminiMultimodalLiveLLMService(
api_key=os.getenv("GOOGLE_API_KEY"),
system_instruction=system_instruction,
@@ -117,7 +120,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -79,7 +79,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -106,7 +106,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,

View File

@@ -1,5 +1,5 @@
#
# Copyright (c) 2024-2025, Daily
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
@@ -34,7 +34,7 @@ search_tool = {"google_search": {}}
tools = [search_tool]
system_instruction = """
You are an expert at providing the most recent news from any place. Your responses will be converted to audio, so avoid using special characters or overly complex formatting.
You are an expert at providing the most recent news from any place. Your responses will be converted to audio, so avoid using special characters or overly complex formatting.
Always use the google search API to retrieve the latest news. You must also use it to check which day is today.
@@ -93,7 +93,7 @@ async def main():
]
)
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):

View File

@@ -83,7 +83,7 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
),

Some files were not shown because too many files have changed in this diff Show More