Compare commits

..

6 Commits

Author SHA1 Message Date
Jon Taylor
5bd5d22270 removed space from event handler 2024-06-26 18:30:56 +01:00
Jon Taylor
6ee7932337 added pause to start and new intro prompt 2024-06-26 18:24:14 +01:00
Jon Taylor
c407445dd1 removed header comment from bot runner 2024-06-24 17:35:26 +01:00
Jon Taylor
447f37167e added VAD stop seconds env 2024-06-24 17:34:25 +01:00
Jon Taylor
354c21500e prompt tweaks 2024-06-24 17:28:10 +01:00
Jon Taylor
5728e25b5a added fastbot example 2024-06-24 16:25:36 +01:00
145 changed files with 3298 additions and 11588 deletions

View File

@@ -1,6 +1,10 @@
name: publish-test
on: workflow_dispatch
on:
workflow_dispatch:
push:
branches:
- main
jobs:
build:
@@ -10,6 +14,7 @@ jobs:
- name: Checkout repo
uses: actions/checkout@v4
with:
ref: ${{ github.event.inputs.gitref }}
fetch-tags: true
fetch-depth: 100
- name: Set up Python

View File

@@ -5,262 +5,6 @@ All notable changes to **pipecat** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
### Added
- `ElevenLabsTTSService` can now specify ElevenLabs input parameters such as
`output_format`.
- `TwilioFrameSerializer` can now specify Twilio's and Pipecat's desired sample
rates to use.
- Added new `on_participant_updated` event to `DailyTransport`.
- Added `DailyRESTHelper.delete_room_by_name()`.
- Added LLM and TTS usage metrics. Those will be enabled by when
`enable_usage_metrics` is True.
- `AudioRawFrame`s are not pushed downstream from the base output
transport. This allows capturing the exact words the bot says by adding an STT
service at the end of the pipeline.
- Added new `GStreamerPipelineSource`. This processor can generate image or
audio frames from a GStreamer pipeline (e.g. reading an MP4 file, and RTP
stream or anything supported by GStreamer).
- Added `TransportParams.audio_out_is_live`. This flag is False by default and
it is useful to indicate we should not synchronize audio with sporadic images.
- Added new `BotStartedSpeakingFrame` and `BotStoppedSpeakingFrame` control
frames. These frames are pushed upstream and they should wrap
`BotSpeakingFrame`.
- Transports now allow you to register event handlers without decorators.
### Changed
- `ElevenLabsTTSService` now uses `eleven_turbo_v2_5` model by default.
- `BotSpeakingFrame` is now a control frame.
- `StartFrame` is now a control frame similar to `EndFrame`.
- `DeepgramTTSService` now is more customizable. You can adjust the encoding and
sample rate.
### Fixed
- Fixed and issue with `DailyRESTHelper.create_room()` expirations which would
cause this function to stop working after the initial expiration elapsed.
- Improved `EndFrame` and `CancelFrame` handling. `EndFrame` should end things
gracefully while a `CancelFrame` should cancel all running tasks as soon as
possible.
- Fixed an issue in `AIService` that would cause a yielded `None` value to be
processed.
- RTVI's `bot-ready` message is now sent when the RTVI pipeline is ready and
a first participant joins.
- Fixed a `BaseInputTransport` issue that was causing incoming system frames to
be queued instead of being pushed immediately.
- Fixed a `BaseInputTransport` issue that was causing start/stop interruptions
incoming frames to not cancel tasks and be processed properly.
### Other
- Added examples `foundational/18-gstreamer-filesrc.py` and
`foundational/18a-gstreamer-videotestsrc.py` that show how to use
`GStreamerPipelineSource`
- Remove `requests` library usage.
- Cleanup examples and use `DailyRESTHelper`.
## [0.0.39] - 2024-07-23
### Fixed
- Fixed a regression introduced in 0.0.38 that would cause Daily transcription
to stop the Pipeline.
## [0.0.38] - 2024-07-23
### Added
- Added `force_reload`, `skip_validation` and `trust_repo` to `SileroVAD` and
`SileroVADAnalyzer`. This allows caching and various GitHub repo validations.
- Added `send_initial_empty_metrics` flag to `PipelineParams` to request for
initial empty metrics (zero values). True by default.
### Fixed
- Fixed initial metrics format. It was using the wrong keys name/time instead of
processor/value.
- STT services should be using ISO 8601 time format for transcription frames.
- Fixed an issue that would cause Daily transport to show a stop transcription
error when actually none occurred.
## [0.0.37] - 2024-07-22
### Added
- Added `RTVIProcessor` which implements the RTVI-AI standard.
See https://github.com/rtvi-ai
- Added `BotInterruptionFrame` which allows interrupting the bot while talking.
- Added `LLMMessagesAppendFrame` which allows appending messages to the current
LLM context.
- Added `LLMMessagesUpdateFrame` which allows changing the LLM context for the
one provided in this new frame.
- Added `LLMModelUpdateFrame` which allows updating the LLM model.
- Added `TTSSpeakFrame` which causes the bot say some text. This text will not
be part of the LLM context.
- Added `TTSVoiceUpdateFrame` which allows updating the TTS voice.
### Removed
- We remove the `LLMResponseStartFrame` and `LLMResponseEndFrame` frames. These
were added in the past to properly handle interruptions for the
`LLMAssistantContextAggregator`. But the `LLMContextAggregator` is now based
on `LLMResponseAggregator` which handles interruptions properly by just
processing the `StartInterruptionFrame`, so there's no need for these extra
frames any more.
### Fixed
- Fixed an issue with `StatelessTextTransformer` where it was pushing a string
instead of a `TextFrame`.
- `TTSService` end of sentence detection has been improved. It now works with
acronyms, numbers, hours and others.
- Fixed an issue in `TTSService` that would not properly flush the current
aggregated sentence if an `LLMFullResponseEndFrame` was found.
### Performance
- `CartesiaTTSService` now uses websockets which improves speed. It also
leverages the new Cartesia contexts which maintains generated audio prosody
when multiple inputs are sent, therefore improving audio quality a lot.
## [0.0.36] - 2024-07-02
### Added
- Added `GladiaSTTService`.
See https://docs.gladia.io/chapters/speech-to-text-api/pages/live-speech-recognition
- Added `XTTSService`. This is a local Text-To-Speech service.
See https://github.com/coqui-ai/TTS
- Added `UserIdleProcessor`. This processor can be used to wait for any
interaction with the user. If the user doesn't say anything within a given
timeout a provided callback is called.
- Added `IdleFrameProcessor`. This processor can be used to wait for frames
within a given timeout. If no frame is received within the timeout a provided
callback is called.
- Added new frame `BotSpeakingFrame`. This frame will be continuously pushed
upstream while the bot is talking.
- It is now possible to specify a Silero VAD version when using `SileroVADAnalyzer`
or `SileroVAD`.
- Added `AysncFrameProcessor` and `AsyncAIService`. Some services like
`DeepgramSTTService` need to process things asynchronously. For example, audio
is sent to Deepgram but transcriptions are not returned immediately. In these
cases we still require all frames (except system frames) to be pushed
downstream from a single task. That's what `AsyncFrameProcessor` is for. It
creates a task and all frames should be pushed from that task. So, whenever a
new Deepgram transcription is ready that transcription will also be pushed
from this internal task.
- The `MetricsFrame` now includes processing metrics if metrics are enabled. The
processing metrics indicate the time a processor needs to generate all its
output. Note that not all processors generate these kind of metrics.
### Changed
- `WhisperSTTService` model can now also be a string.
- Added missing * keyword separators in services.
### Fixed
- `WebsocketServerTransport` doesn't try to send frames anymore if serializers
returns `None`.
- Fixed an issue where exceptions that occurred inside frame processors were
being swallowed and not displayed.
- Fixed an issue in `FastAPIWebsocketTransport` where it would still try to send
data to the websocket after being closed.
### Other
- Added Fly.io deployment example in `examples/deployment/flyio-example`.
- Added new `17-detect-user-idle.py` example that shows how to use the new
`UserIdleProcessor`.
## [0.0.35] - 2024-06-28
### Changed
- `FastAPIWebsocketParams` now require a serializer.
- `TwilioFrameSerializer` now requires a `streamSid`.
### Fixed
- Silero VAD number of frames needs to be 512 for 16000 sample rate or 256 for
8000 sample rate.
## [0.0.34] - 2024-06-25
### Fixed
- Fixed an issue with asynchronous STT services (Deepgram and Azure) that could
interruptions to ignore transcriptions.
- Fixed an issue introduced in 0.0.33 that would cause the LLM to generate
shorter output.
## [0.0.33] - 2024-06-25
### Changed
- Upgraded to Cartesia's new Python library 1.0.0. `CartesiaTTSService` now
expects a voice ID instead of a voice name (you can get the voice ID from
Cartesia's playground). You can also specify the audio `sample_rate` and
`encoding` instead of the previous `output_format`.
### Fixed
- Fixed an issue with asynchronous STT services (Deepgram and Azure) that could
cause static audio issues and interruptions to not work properly when dealing
with multiple LLMs sentences.
- Fixed an issue that could mix new LLM responses with previous ones when
handling interruptions.
- Fixed a Daily transport blocking situation that occurred while reading audio
frames after a participant left the room. Needs daily-python >= 0.10.1.
## [0.0.32] - 2024-06-22
### Added

View File

@@ -39,7 +39,7 @@ pip install "pipecat-ai[option,...]"
Your project may or may not need these, so they're made available as optional requirements. Here is a list:
- **AI services**: `anthropic`, `azure`, `deepgram`, `gladia`, `google`, `fal`, `moondream`, `openai`, `openpipe`, `playht`, `silero`, `whisper`, `xtts`
- **AI services**: `anthropic`, `azure`, `deepgram`, `google`, `fal`, `moondream`, `openai`, `openpipe`, `playht`, `silero`, `whisper`
- **Transports**: `local`, `websocket`, `daily`
## Code examples
@@ -70,8 +70,8 @@ async def main():
transport = DailyTransport(
room_url=...,
token=...,
bot_name="Bot Name",
params=DailyParams(audio_out_enabled=True))
"Bot Name",
DailyParams(audio_out_enabled=True))
# Use Eleven Labs for Text-to-Speech
tts = ElevenLabsTTSService(
@@ -125,7 +125,7 @@ Sign up [here](https://dashboard.daily.co/u/signup) and [create a room](https://
Voice Activity Detection — very important for knowing when a user has finished speaking to your bot. If you are not using press-to-talk, and want Pipecat to detect when the user has finished talking, VAD is an essential component for a natural feeling conversation.
Pipecat makes use of WebRTC VAD by default when using a WebRTC transport layer. Optionally, you can use Silero VAD for improved accuracy at the cost of higher CPU usage.
Pipecast makes use of WebRTC VAD by default when using a WebRTC transport layer. Optionally, you can use Silero VAD for improved accuracy at the cost of higher CPU usage.
```shell
pip install pipecat-ai[silero]

View File

@@ -4,5 +4,5 @@ grpcio-tools~=1.62.2
pip-tools~=7.4.1
pyright~=1.1.367
pytest~=8.2.0
setuptools~=71.1.0
setuptools~=69.5.1
setuptools_scm~=8.1.0

View File

@@ -27,9 +27,6 @@ FAL_KEY=...
# Fireworks
FIREWORKS_API_KEY=...
# Gladia
GLADIA_API_KEY=...
# PlayHT
PLAY_HT_USER_ID=...
PLAY_HT_API_KEY=...

View File

@@ -1,13 +0,0 @@
FROM python:3.11-bullseye
# Open port 7860 for http service
ENV FAST_API_PORT=7860
EXPOSE 7860
# Install Python dependencies
COPY *.py .
COPY ./requirements.txt requirements.txt
RUN pip3 install --no-cache-dir --upgrade -r requirements.txt
# Start the FastAPI server
CMD python3 bot_runner.py --port ${FAST_API_PORT}

View File

@@ -1,39 +0,0 @@
# Fly.io deployment example
This project modifies the `bot_runner.py` server to launch a new machine for each user session. This is a recommended approach for production vs. running shell processess as your deployment will quickly run out of system resources under load.
For this example, we are using Daily as a WebRTC transport and provisioning a new room and token for each session. You can use another transport, such as WebSockets, by modifying the `bot.py` and `bot_runner.py` files accordingly.
## Setting up your fly.io deployment
### Create your fly.toml file
You can copy the `example-fly.toml` as a reference. Be sure to change the app name to something unique.
### Create your .env file
Copy the base `env.example` to `.env` and enter the necessary API keys.
`FLY_APP_NAME` should match that in the `fly.toml` file.
### Launch a new fly.io project
`fly launch` or `fly launch --org your-org-name`
### Set the necessary app secrets from your .env
Note: you can do this manually via the fly.io dashboard under the "secrets" sub-section of your deployment (e.g. "https://fly.io/apps/fly-app-name/secrets") or run the following terminal command:
`cat .env | tr '\n' ' ' | xargs flyctl secrets set`
### Deploy your machine
`fly deploy`
## Connecting to your bot
Send a post request to your running fly.io instance:
`curl --location --request POST 'https://YOUR_FLY_APP_NAME/start_bot'`
This request will wait until the machine enters into a `starting` state, before returning the a room URL and token to join.

View File

@@ -1,103 +0,0 @@
import asyncio
import aiohttp
import os
import sys
import argparse
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import LLMAssistantResponseAggregator, LLMUserResponseAggregator
from pipecat.frames.frames import LLMMessagesFrame, EndFrame
from pipecat.services.openai import OpenAILLMService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
daily_api_key = os.getenv("DAILY_API_KEY", "")
daily_api_url = os.getenv("DAILY_API_URL", "https://api.daily.co/v1")
async def main(room_url: str, token: str):
async with aiohttp.ClientSession() as session:
transport = DailyTransport(
room_url,
token,
"Chatbot",
DailyParams(
api_url=daily_api_url,
api_key=daily_api_key,
audio_in_enabled=True,
audio_out_enabled=True,
camera_out_enabled=False,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
transcription_enabled=True,
)
)
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY", ""),
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are Chatbot, a friendly, helpful robot. Your output will be converted to audio so don't include special characters other than '!' or '?' in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by saying hello.",
},
]
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(),
tma_in,
llm,
tts,
transport.output(),
tma_out,
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await task.queue_frames([LLMMessagesFrame(messages)])
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
@transport.event_handler("on_call_state_updated")
async def on_call_state_updated(transport, state):
if state == "left":
await task.queue_frame(EndFrame())
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Pipecat Bot")
parser.add_argument("-u", type=str, help="Room URL")
parser.add_argument("-t", type=str, help="Token")
config = parser.parse_args()
asyncio.run(main(config.u, config.t))

View File

@@ -1,215 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import aiohttp
import argparse
import subprocess
import os
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from pipecat.transports.services.helpers.daily_rest import (
DailyRESTHelper, DailyRoomObject, DailyRoomProperties, DailyRoomParams)
from dotenv import load_dotenv
load_dotenv(override=True)
# ------------ Configuration ------------ #
MAX_SESSION_TIME = 5 * 60 # 5 minutes
REQUIRED_ENV_VARS = [
'DAILY_API_KEY',
'OPENAI_API_KEY',
'ELEVENLABS_API_KEY',
'ELEVENLABS_VOICE_ID',
'FLY_API_KEY',
'FLY_APP_NAME',]
FLY_API_HOST = os.getenv("FLY_API_HOST", "https://api.machines.dev/v1")
FLY_APP_NAME = os.getenv("FLY_APP_NAME", "pipecat-fly-example")
FLY_API_KEY = os.getenv("FLY_API_KEY", "")
FLY_HEADERS = {
'Authorization': f"Bearer {FLY_API_KEY}",
'Content-Type': 'application/json'
}
daily_helpers = {}
# ----------------- API ----------------- #
@asynccontextmanager
async def lifespan(app: FastAPI):
aiohttp_session = aiohttp.ClientSession()
daily_helpers["rest"] = DailyRESTHelper(
daily_api_key=os.getenv("DAILY_API_KEY", ""),
daily_api_url=os.getenv("DAILY_API_URL", 'https://api.daily.co/v1'),
aiohttp_session=aiohttp_session
)
yield
await aiohttp_session.close()
app = FastAPI(lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"]
)
# ----------------- Main ----------------- #
async def spawn_fly_machine(room_url: str, token: str):
async with aiohttp.ClientSession() as session:
# Use the same image as the bot runner
async with session.get(f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines", headers=FLY_HEADERS) as r:
if r.status != 200:
text = await r.text()
raise Exception(f"Unable to get machine info from Fly: {text}")
data = await r.json()
image = data[0]['config']['image']
# Machine configuration
cmd = f"python3 bot.py -u {room_url} -t {token}"
cmd = cmd.split()
worker_props = {
"config": {
"image": image,
"auto_destroy": True,
"init": {
"cmd": cmd
},
"restart": {
"policy": "no"
},
"guest": {
"cpu_kind": "shared",
"cpus": 1,
"memory_mb": 1024
}
},
}
# Spawn a new machine instance
async with session.post(f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines", headers=FLY_HEADERS, json=worker_props) as r:
if r.status != 200:
text = await r.text()
raise Exception(f"Problem starting a bot worker: {text}")
data = await r.json()
# Wait for the machine to enter the started state
vm_id = data['id']
async with session.get(f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines/{vm_id}/wait?state=started", headers=FLY_HEADERS) as r:
if r.status != 200:
text = await r.text()
raise Exception(f"Bot was unable to enter started state: {text}")
print(f"Machine joined room: {room_url}")
@app.post("/start_bot")
async def start_bot(request: Request) -> JSONResponse:
try:
data = await request.json()
# Is this a webhook creation request?
if "test" in data:
return JSONResponse({"test": True})
except Exception as e:
pass
# Use specified room URL, or create a new one if not specified
room_url = os.getenv("DAILY_SAMPLE_ROOM_URL", "")
if not room_url:
params = DailyRoomParams(
properties=DailyRoomProperties()
)
try:
room: DailyRoomObject = await daily_helpers["rest"].create_room(params=params)
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Unable to provision room {e}")
else:
# Check passed room URL exists, we should assume that it already has a sip set up
try:
room: DailyRoomObject = await daily_helpers["rest"].get_room_from_url(room_url)
except Exception:
raise HTTPException(
status_code=500, detail=f"Room not found: {room_url}")
# Give the agent a token to join the session
token = await daily_helpers["rest"].get_token(room.url, MAX_SESSION_TIME)
if not room or not token:
raise HTTPException(
status_code=500, detail=f"Failed to get token for room: {room_url}")
# Launch a new fly.io machine, or run as a shell process (not recommended)
run_as_process = os.getenv("RUN_AS_PROCESS", False)
if run_as_process:
try:
subprocess.Popen(
[f"python3 -m bot -u {room.url} -t {token}"],
shell=True,
bufsize=1,
cwd=os.path.dirname(os.path.abspath(__file__)))
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to start subprocess: {e}")
else:
try:
await spawn_fly_machine(room.url, token)
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to spawn VM: {e}")
# Grab a token for the user to join with
user_token = await daily_helpers["rest"].get_token(room.url, MAX_SESSION_TIME)
return JSONResponse({
"room_url": room.url,
"token": user_token,
})
if __name__ == "__main__":
# Check environment variables
for env_var in REQUIRED_ENV_VARS:
if env_var not in os.environ:
raise Exception(f"Missing environment variable: {env_var}.")
parser = argparse.ArgumentParser(description="Pipecat Bot Runner")
parser.add_argument("--host", type=str,
default=os.getenv("HOST", "0.0.0.0"), help="Host address")
parser.add_argument("--port", type=int,
default=os.getenv("PORT", 7860), help="Port number")
parser.add_argument("--reload", action="store_true",
default=False, help="Reload code on change")
config = parser.parse_args()
try:
import uvicorn
uvicorn.run(
"bot_runner:app",
host=config.host,
port=config.port,
reload=config.reload
)
except KeyboardInterrupt:
print("Pipecat runner shutting down...")

View File

@@ -1,8 +0,0 @@
DAILY_API_KEY=
DAILY_SAMPLE_ROOM_URL= # Enter a Daily room URL to use a set room URL each time (useful for local testing)
OPENAI_API_KEY=
ELEVENLABS_API_KEY=
ELEVENLABS_VOICE_ID=
FLY_API_KEY=
FLY_APP_NAME=
RUN_AS_PROCESS= # Spawn fly.io machine for each session or run as local process

View File

@@ -1,25 +0,0 @@
# fly.toml app configuration file generated for pipecat-fly-example on 2024-07-01T15:04:53+01:00
#
# See https://fly.io/docs/reference/configuration/ for information about how to use this file.
#
app = 'pipecat-fly-example'
primary_region = 'sjc'
[build]
[env]
FLY_APP_NAME = 'pipecat-fly-example'
[http_service]
internal_port = 7860
force_https = true
auto_stop_machines = true
auto_start_machines = true
min_machines_running = 0
processes = ['app']
[[vm]]
memory = 512
cpu_kind = 'shared'
cpus = 1

View File

@@ -1,5 +0,0 @@
pipecat-ai[daily,openai,silero]
fastapi
uvicorn
python-dotenv
loguru

View File

@@ -6,27 +6,15 @@ provisioning a room and starting a Pipecat bot in response.
Refer to README for more information.
"""
import aiohttp
import os
import argparse
import subprocess
from contextlib import asynccontextmanager
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomObject, DailyRoomProperties, DailyRoomSipParams, DailyRoomParams
from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, PlainTextResponse
from twilio.twiml.voice_response import VoiceResponse
from pipecat.transports.services.helpers.daily_rest import (
DailyRESTHelper,
DailyRoomObject,
DailyRoomProperties,
DailyRoomSipParams,
DailyRoomParams)
from dotenv import load_dotenv
load_dotenv(override=True)
@@ -37,23 +25,14 @@ MAX_SESSION_TIME = 5 * 60 # 5 minutes
REQUIRED_ENV_VARS = ['OPENAI_API_KEY', 'DAILY_API_KEY',
'ELEVENLABS_API_KEY', 'ELEVENLABS_VOICE_ID']
daily_helpers = {}
daily_rest_helper = DailyRESTHelper(
os.getenv("DAILY_API_KEY", ""),
os.getenv("DAILY_API_URL", 'https://api.daily.co/v1'))
# ----------------- API ----------------- #
@asynccontextmanager
async def lifespan(app: FastAPI):
aiohttp_session = aiohttp.ClientSession()
daily_helpers["rest"] = DailyRESTHelper(
daily_api_key=os.getenv("DAILY_API_KEY", ""),
daily_api_url=os.getenv("DAILY_API_URL", 'https://api.daily.co/v1'),
aiohttp_session=aiohttp_session
)
yield
await aiohttp_session.close()
app = FastAPI(lifespan=lifespan)
app = FastAPI()
app.add_middleware(
CORSMiddleware,
@@ -74,7 +53,7 @@ action using the Twilio Client library.
"""
async def _create_daily_room(room_url, callId, callDomain=None, vendor="daily"):
def _create_daily_room(room_url, callId, callDomain=None, vendor="daily"):
if not room_url:
params = DailyRoomParams(
properties=DailyRoomProperties(
@@ -89,13 +68,14 @@ async def _create_daily_room(room_url, callId, callDomain=None, vendor="daily"):
)
print(f"Creating new room...")
room: DailyRoomObject = await daily_helpers["rest"].create_room(params=params)
room: DailyRoomObject = daily_rest_helper.create_room(params=params)
else:
# Check passed room URL exist (we assume that it already has a sip set up!)
try:
print(f"Joining existing room: {room_url}")
room: DailyRoomObject = await daily_helpers["rest"].get_room_from_url(room_url)
room: DailyRoomObject = daily_rest_helper.get_room_from_url(
room_url)
except Exception:
raise HTTPException(
status_code=500, detail=f"Room not found: {room_url}")
@@ -103,7 +83,7 @@ async def _create_daily_room(room_url, callId, callDomain=None, vendor="daily"):
print(f"Daily room: {room.url} {room.config.sip_endpoint}")
# Give the agent a token to join the session
token = await daily_helpers["rest"].get_token(room.url, MAX_SESSION_TIME)
token = daily_rest_helper.get_token(room.url, MAX_SESSION_TIME)
if not room or not token:
raise HTTPException(
@@ -112,11 +92,11 @@ async def _create_daily_room(room_url, callId, callDomain=None, vendor="daily"):
# Spawn a new agent, and join the user session
# Note: this is mostly for demonstration purposes (refer to 'deployment' in docs)
if vendor == "daily":
bot_proc = f"python3 - m bot_daily - u {room.url} - t {token} - i {
callId} - d {callDomain}"
bot_proc = f"python3 -m bot_daily -u {room.url} -t {token} -i {
callId} -d {callDomain}"
else:
bot_proc = f"python3 - m bot_twilio - u {room.url} - t {
token} - i {callId} - s {room.config.sip_endpoint}"
bot_proc = f"python3 -m bot_twilio -u {room.url} -t {
token} -i {callId} -s {room.config.sip_endpoint}"
try:
subprocess.Popen(
@@ -160,7 +140,8 @@ async def twilio_start_bot(request: Request):
# create room and tell the bot to join the created room
# note: Twilio does not require a callDomain
room: DailyRoomObject = await _create_daily_room(room_url, callId, None, "twilio")
room: DailyRoomObject = _create_daily_room(
room_url, callId, None, "twilio")
print(f"Put Twilio on hold...")
# We have the room and the SIP URI,
@@ -197,7 +178,8 @@ async def daily_start_bot(request: Request) -> JSONResponse:
detail="Missing properties 'callId' or 'callDomain'")
print(f"CallId: {callId}, CallDomain: {callDomain}")
room: DailyRoomObject = await _create_daily_room(room_url, callId, callDomain, "daily")
room: DailyRoomObject = _create_daily_room(
room_url, callId, callDomain, "daily")
# Grab a token for the user to join with
return JSONResponse({

View File

@@ -1,5 +1,7 @@
pipecat-ai[daily,openai,silero]
fastapi
uvicorn
requests
python-dotenv
twilio
loguru
twilio

165
examples/fast-chatbot/.gitignore vendored Normal file
View File

@@ -0,0 +1,165 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
runpod.toml
# custom script to recursively upgrade items in requirements.py
upgrade_requirements.py
.DS_Store

View File

@@ -0,0 +1,164 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
from loguru import logger
import argparse
import asyncio
import aiohttp
import os
import sys
import time
from typing import Optional
from pydantic import BaseModel, ValidationError
from pipecat.vad.vad_analyzer import VADParams
from pipecat.vad.silero import SileroVADAnalyzer
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.services.openai import OpenAILLMService
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.pipeline import Pipeline
from pipecat.frames.frames import LLMMessagesFrame, EndFrame
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator
)
from helpers import (
ClearableDeepgramTTSService,
AudioVolumeTimer,
TranscriptionTimingLogger
)
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level=os.getenv("LOG_LEVEL", "DEBUG"))
class BotSettings(BaseModel):
room_url: str
room_token: str
bot_name: str = "Pipecat"
prompt: Optional[str] = "You are a helpful assistant."
deepgram_api_key: Optional[str] = os.getenv("DEEPGRAM_API_KEY", None)
deepgram_voice: Optional[str] = os.getenv("DEEPGRAM_VOICE", "aura-asteria-en")
deepgram_tts_base_url: Optional[str] = os.getenv(
"DEEPGRAM_TTS_BASE_URL", "https://api.deepgram.com/v1/speak")
deepgram_stt_base_url: Optional[str] = os.getenv(
"DEEPGRAM_STT_BASE_URL", "https://api.deepgram.com/v1/speak")
openai_api_key: Optional[str] = os.getenv("OPENAI_API_KEY", None),
openai_model: Optional[str] = os.getenv("OPENAI_MODEL", None),
openai_base_url: Optional[str] = os.getenv("OPENAI_BASE_URL", None)
vad_stop_secs: Optional[float] = os.getenv("VAD_STOP_SECS", 0.200)
async def main(settings: BotSettings):
async with aiohttp.ClientSession() as session:
transport = DailyTransport(
settings.room_url,
settings.room_token,
settings.bot_name,
DailyParams(
audio_out_enabled=True,
transcription_enabled=False,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(
stop_secs=settings.vad_stop_secs
)),
vad_audio_passthrough=True
)
)
stt = DeepgramSTTService(
name="STT",
api_key=settings.deepgram_api_key,
url=settings.deepgram_stt_base_url
)
tts = ClearableDeepgramTTSService(
name="Voice",
aiohttp_session=session,
api_key=settings.deepgram_api_key,
voice=settings.deepgram_voice,
**({'base_url': url} if (url := settings.deepgram_tts_base_url) else {})
)
llm = OpenAILLMService(
name="LLM",
api_key=settings.openai_api_key,
model=settings.openai_model,
base_url=settings.openai_base_url,
)
messages = [
{
"role": "system",
"content": settings.prompt,
},
]
avt = AudioVolumeTimer()
tl = TranscriptionTimingLogger(avt)
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Transport user input
avt, # Audio volume timer
stt, # Speech-to-text
tl, # Transcription timing logger
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out, # Assistant spoken responses
])
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
report_only_initial_ttfb=True
))
# When the participant leaves, we exit the bot.
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
# When the first participant joins, the bot should introduce itself.
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
# Provide some air whilst tracks subscribe
time.sleep(2)
messages.append(
{
"role": "system",
"content": "Briefly introduce yourself by saying 'hello, I'm FastBot, how can I help you today?'"})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Pipecat Bot")
parser.add_argument("-s", "--settings", type=str, required=True, help="Pipecat bot settings")
args, unknown = parser.parse_known_args()
try:
settings = BotSettings.model_validate_json(args.settings)
asyncio.run(main(settings))
except ValidationError as e:
print(e)

View File

@@ -0,0 +1,164 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
import argparse
import subprocess
from pydantic import BaseModel, ValidationError
from typing import Optional
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomObject, DailyRoomProperties, DailyRoomParams
from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from bot import BotSettings
from dotenv import load_dotenv
load_dotenv(override=True)
# ------------ Configuration ------------ #
MAX_SESSION_TIME = 5 * 60 # 5 minutes
REQUIRED_ENV_VARS = ['DAILY_API_URL', 'DAILY_API_KEY', 'DEEPGRAM_API_KEY']
daily_rest_helper = DailyRESTHelper(
os.getenv("DAILY_API_KEY", ""),
os.getenv("DAILY_API_URL", 'https://api.daily.co/v1'))
class RunnerSettings(BaseModel):
prompt: Optional[
str] = "You are a fast, low-latency chatbot. Your goal is to demonstrate voice-driven AI capabilities at human-like speeds. When introducing yourself briefly mention your goal is to showcase speed and conversational flow. The technology powering you is Daily for transport, Cerebrium for GPU hosting, Llama 3 (8-B version) LLM, and Deepgram for speech-to-text and text-to-speech. You are hosted on the east coast of the United States. Respond to what the user said in a creative and helpful way, but keep responses short and legible. Ensure responses contain only words. Check again that you have not included special characters other than '?' or '!'."
deepgram_voice: Optional[str] = os.getenv("DEEPGRAM_VOICE")
openai_model: Optional[str] = os.getenv("OPENAI_MODEL", "gpt-4o")
openai_api_key: Optional[str] = os.getenv("OPENAI_API_KEY")
test: Optional[bool] = None
# ----------------- API ----------------- #
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"]
)
# ----------------- Main ----------------- #
@app.post("/start_bot")
async def start_bot(request: Request) -> JSONResponse:
runner_settings = RunnerSettings()
try:
request_body = await request.body()
if len(request_body) > 0:
runner_settings = RunnerSettings.model_validate_json(request_body)
except ValidationError as e:
raise HTTPException(
status_code=400,
detail=f"Invalid request: {e}")
except Exception as e:
# If no data in request, pass
pass
# Is this a webhook creation request?
if runner_settings.test is not None:
return JSONResponse({"test": True})
# Use specified room URL, or create a new one if not specified
room_url = os.getenv("DAILY_SAMPLE_ROOM_URL", "")
if not room_url:
params = DailyRoomParams(
properties=DailyRoomProperties()
)
try:
room: DailyRoomObject = daily_rest_helper.create_room(params=params)
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Unable to provision room {e}")
else:
# Check passed room URL exists, we should assume that it already has a sip set up
try:
room: DailyRoomObject = daily_rest_helper.get_room_from_url(room_url)
except Exception:
raise HTTPException(
status_code=500, detail=f"Room not found: {room_url}")
# Give the agent a token to join the session
token = daily_rest_helper.get_token(room.url, MAX_SESSION_TIME)
if not room or not token:
raise HTTPException(
status_code=500, detail=f"Failed to get token for room: {room_url}")
# Spawn a new agent, and join the user session
try:
bot_settings = BotSettings(
room_url=room.url,
room_token=token,
prompt=runner_settings.prompt,
deepgram_voice=runner_settings.deepgram_voice,
openai_model=runner_settings.openai_model,
openai_api_key=runner_settings.openai_api_key,
)
bot_settings_str = bot_settings.model_dump_json(exclude_none=True)
subprocess.Popen(
[f"python3 -m bot -s '{bot_settings_str}'"],
shell=True,
bufsize=1,
cwd=os.path.dirname(os.path.abspath(__file__)))
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to start subprocess: {e}")
# Grab a token for the user to join with
user_token = daily_rest_helper.get_token(room.url, MAX_SESSION_TIME)
return JSONResponse({
"room_url": room.url,
"token": user_token,
})
if __name__ == "__main__":
# Check environment variables
for env_var in REQUIRED_ENV_VARS:
if env_var not in os.environ:
raise Exception(f"Missing environment variable: {env_var}.")
parser = argparse.ArgumentParser(description="Pipecat Bot Runner")
parser.add_argument("--host", type=str,
default=os.getenv("HOST", "0.0.0.0"), help="Host address")
parser.add_argument("--port", type=int,
default=os.getenv("PORT", 7860), help="Port number")
parser.add_argument("--reload", action="store_true",
default=True, help="Reload code on change")
config = parser.parse_args()
try:
import uvicorn
uvicorn.run(
"bot_runner:app",
host=config.host,
port=config.port,
reload=config.reload
)
except KeyboardInterrupt:
print("Pipecat runner shutting down...")

View File

@@ -0,0 +1,12 @@
DAILY_SAMPLE_ROOM_URL= #optional: use the same room each time, or create a new one if unset
DAILY_API_KEY=
DAILY_API_URL=
DEEPGRAM_API_KEY=
DEEPGRAM_VOICE=
DEEPGRAM_STT_URL=
DEEPGRAM_TTS_BASE_URL=
OPENAI_API_KEY=
OPENAI_MODEL=
OPENAI_BASE_URL=

View File

@@ -0,0 +1,267 @@
from loguru import logger
import asyncio
import math
import struct
import time
from dataclasses import dataclass, field
from typing import List
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.frames.frames import (
Frame,
AudioRawFrame,
InterimTranscriptionFrame,
TranscriptionFrame,
TextFrame,
StartInterruptionFrame,
LLMFullResponseStartFrame,
TTSStoppedFrame,
MetricsFrame
)
from pipecat.vad.vad_analyzer import VADAnalyzer, VADState
from pipecat.services.deepgram import DeepgramTTSService
from pipecat.services.openai import OpenAILLMContext, OpenAILLMContextFrame
class GreedyLLMAggregator(FrameProcessor):
def __init__(self, context: OpenAILLMContext = None, **kwargs):
super().__init__(**kwargs)
self.context: OpenAILLMContext = context if context else OpenAILLMContext()
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
logger.debug(f"{frame}")
try:
if isinstance(frame, InterimTranscriptionFrame):
return
if isinstance(frame, TranscriptionFrame):
# append transcribed text to last "user" frame
if self.context.messages and self.context.messages[-1]["role"] == "user":
last_frame = self.context.messages.pop()
else:
last_frame = {"role": "user", "content": ""}
last_frame["content"] += " " + frame.text
self.context.messages.append(last_frame)
oai_context_frame = OpenAILLMContextFrame(context=self.context)
logger.debug(f"pushing frame {oai_context_frame}")
await self.push_frame(oai_context_frame)
return
await self.push_frame(frame, direction)
except Exception as e:
logger.debug(f"error: {e}")
class ClearableDeepgramTTSService(DeepgramTTSService):
def __init___(self, **kwargs):
super().__init(**kwargs)
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, StartInterruptionFrame):
self._current_sentence = ""
@dataclass
class BufferedSentence:
audio_frames: List[AudioRawFrame] = field(default_factory=list)
text_frame: TextFrame = None
class VADGate(FrameProcessor):
def __init__(
self,
vad_analyzer: VADAnalyzer = None,
context: OpenAILLMContext = None,
**kwargs):
super().__init__(**kwargs)
self.vad_analyzer = vad_analyzer
self.context = context
self._audio_pusher_task = None
self._expect_text_frame_next = False
self._sentences: List[BufferedSentence] = []
# queue output from tts one sentence at a time. associate a buffer of audio frames with the content of
# each text frame.
#
# start a coroutine to service the queue and send sentences down the pipeline when possible.
# 1. do not send anything when we are not in VADState.QUIET
# 2. if we are in VADState.QUIET, send a sentence, estimate how long it will take for that sentence
# to output, sleep until it's time to send another sentence
# 3. each time we send a sentence, append it to the conversation context
# 3. when the sentence buffer becomes empty, cancel the coroutine
# 4. if we get a new LLMFullResponse, treat that as a cancellation, too
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
try:
# A TTSService will emit a series of AudioRawFrame objects, then a TTSStoppedFrame,
# then a TextFrame.
if self._expect_text_frame_next:
self._expect_text_frame_next = False
if isinstance(frame, TextFrame):
self._sentences[-1].text_frame = frame
else:
logger.debug(f"expected a text frame, but received {frame}")
await self.push_frame(frame, direction)
return
else:
if isinstance(frame, TextFrame):
logger.error(f"XXXXXXXXXXXXXXXXXXX received a text frame, wasn't expecting it.")
if isinstance(frame, AudioRawFrame):
# if our buffer is empty or has a "finished" sentence at the end,
# then we need to start buffering a new sentence
if not self._sentences or self._sentences[-1].text_frame:
self._sentences.append(BufferedSentence())
self._sentences[-1].audio_frames.append(frame)
await self.maybe_start_audio_pusher_task()
return
if isinstance(frame, TTSStoppedFrame):
self._expect_text_frame_next = True
await self.push_frame(frame, direction)
return
# There are two ways we can be interrupted. During greedy inference, a new
# LLM response can start. Or, during playout, we can get a traditional
# user interruption frame.
if (isinstance(frame, LLMFullResponseStartFrame) or
isinstance(frame, StartInterruptionFrame)):
logger.debug(f"{frame} - Handle interruption in VADGate")
self._sentences = []
if self._audio_pusher_task:
self._audio_pusher_task.cancel()
self._audio_pusher_task = None
await self.push_frame(frame, direction)
return
await self.push_frame(frame, direction)
except Exception as e:
logger.debug(f"error: {e}")
async def maybe_start_audio_pusher_task(self):
try:
if self._audio_pusher_task:
return
self._audio_pusher_task = self.get_event_loop().create_task(self.push_audio())
except Exception as e:
logger.debug(f"Exception {e}")
async def push_audio(self):
try:
while True:
if not self._sentences:
await asyncio.sleep(0.01)
continue
if self.vad_analyzer._vad_state != VADState.QUIET:
await asyncio.sleep(0.01)
continue
# we only want to push completed sentence buffers
if not self._sentences[0].text_frame:
await asyncio.sleep(0.01)
continue
s = self._sentences.pop(0)
if not s.audio_frames:
continue
sample_rate = s.audio_frames[0].sample_rate
duration = 0
logger.debug(f"Pushing {len(s.audio_frames)} audio frames")
for frame in s.audio_frames:
await self.push_frame(frame)
# assume linear16 encoding (2 bytes per sample). todo: add some more
# metadata to AudioRawFrame, maybe
duration += (len(frame.audio) / 2 / frame.num_channels) / sample_rate
await asyncio.sleep(duration - 20 / 1000)
if self.context:
logger.debug(f"Appending assistant message to context: [{s.text_frame.text}]")
self.context.messages.append(
{"role": "assistant", "content": s.text_frame.text}
)
await self.push_frame(s.text_frame)
except Exception as e:
logger.debug(f"Exception {e}")
class TranscriptionTimingLogger(FrameProcessor):
def __init__(self, avt):
super().__init__()
self.name = "Transcription"
self._avt = avt
async def process_frame(self, frame: Frame, direction: FrameDirection):
try:
await super().process_frame(frame, direction)
if isinstance(frame, TranscriptionFrame):
elapsed = time.time() - self._avt.last_transition_ts
logger.debug(f"Transcription TTF: {elapsed}")
await self.push_frame(MetricsFrame(ttfb={self.name: elapsed}))
await self.push_frame(frame, direction)
except Exception as e:
logger.debug(f"Exception {e}")
class AudioVolumeTimer(FrameProcessor):
def __init__(self):
super().__init__()
self.last_transition_ts = 0
self._prev_volume = -80
self._speech_volume_threshold = -50
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, AudioRawFrame):
volume = self.calculate_volume(frame)
# print(f"Audio volume: {volume:.2f} dB")
if (volume >= self._speech_volume_threshold and
self._prev_volume < self._speech_volume_threshold):
# logger.debug("transition above speech volume threshold")
self.last_transition_ts = time.time()
elif (volume < self._speech_volume_threshold and
self._prev_volume >= self._speech_volume_threshold):
# logger.debug("transition below non-speech volume threshold")
self.last_transition_ts = time.time()
self._prev_volume = volume
await self.push_frame(frame, direction)
def calculate_volume(self, frame: AudioRawFrame) -> float:
if frame.num_channels != 1:
raise ValueError(f"Expected 1 channel, got {frame.num_channels}")
# Unpack audio data into 16-bit integers
fmt = f"{len(frame.audio) // 2}h"
audio_samples = struct.unpack(fmt, frame.audio)
# Calculate RMS
sum_squares = sum(sample**2 for sample in audio_samples)
rms = math.sqrt(sum_squares / len(audio_samples))
# Convert RMS to decibels (dB)
# Reference: maximum value for 16-bit audio is 32767
if rms > 0:
db = 20 * math.log10(rms / 32767)
else:
db = -96 # Minimum value (almost silent)
return db

View File

@@ -0,0 +1,6 @@
pipecat-ai[daily,openai,silero,deepgram]
fastapi
uvicorn
requests
python-dotenv
loguru

View File

@@ -27,10 +27,8 @@ logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async def main(room_url):
async with aiohttp.ClientSession() as session:
(room_url, _) = await configure(session)
transport = DailyTransport(
room_url, None, "Say One Thing", DailyParams(audio_out_enabled=True))
@@ -54,4 +52,5 @@ async def main():
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url))

View File

@@ -28,10 +28,8 @@ logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async def main(room_url):
async with aiohttp.ClientSession() as session:
(room_url, _) = await configure(session)
transport = DailyTransport(
room_url,
None,
@@ -66,4 +64,5 @@ async def main():
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url))

View File

@@ -27,10 +27,8 @@ logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async def main(room_url):
async with aiohttp.ClientSession() as session:
(room_url, _) = await configure(session)
transport = DailyTransport(
room_url,
None,
@@ -66,4 +64,5 @@ async def main():
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url))

View File

@@ -30,10 +30,8 @@ logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async def main(room_url: str):
async with aiohttp.ClientSession() as session:
(room_url, _) = await configure(session)
transport = DailyTransport(room_url, None, "Static And Dynamic Speech")
meeting = TransportServiceOutput(transport, mic_enabled=True)
@@ -84,4 +82,5 @@ async def main():
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url))

View File

@@ -73,10 +73,8 @@ class MonthPrepender(FrameProcessor):
await self.push_frame(frame, direction)
async def main():
async def main(room_url):
async with aiohttp.ClientSession() as session:
(room_url, _) = await configure(session)
transport = DailyTransport(
room_url,
None,
@@ -164,4 +162,5 @@ async def main():
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url))

View File

@@ -9,15 +9,14 @@ import aiohttp
import os
import sys
from pipecat.frames.frames import Frame, LLMMessagesFrame, MetricsFrame
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.logger import FrameLogger
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.openai import OpenAILLMService
@@ -35,18 +34,8 @@ logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
class MetricsLogger(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
if isinstance(frame, MetricsFrame):
print(
f"!!! MetricsFrame: {frame}, ttfb: {frame.ttfb}, processing: {frame.processing}, tokens: {frame.tokens}, characters: {frame.characters}")
await self.push_frame(frame, direction)
async def main():
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
@@ -67,10 +56,11 @@ async def main():
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o"
)
model="gpt-4o")
ml = MetricsLogger()
fl = FrameLogger("!!! after LLM", "red")
fltts = FrameLogger("@@@ out of tts", "green")
flend = FrameLogger("### out of the end", "magenta")
messages = [
{
@@ -85,18 +75,15 @@ async def main():
transport.input(),
tma_in,
llm,
fl,
tts,
ml,
fltts,
transport.output(),
tma_out,
flend
])
task = PipelineTask(pipeline)
task = PipelineTask(pipeline, PipelineParams(
allow_interruptions=True,
enable_metrics=True,
report_only_initial_ttfb=False,
))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
@@ -112,4 +99,5 @@ async def main():
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -51,7 +51,7 @@ class ImageSyncAggregator(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if not isinstance(frame, SystemFrame) and direction == FrameDirection.DOWNSTREAM:
if not isinstance(frame, SystemFrame):
await self.push_frame(ImageRawFrame(image=self._speaking_image_bytes, size=(1024, 1024), format=self._speaking_image_format))
await self.push_frame(frame)
await self.push_frame(ImageRawFrame(image=self._waiting_image_bytes, size=(1024, 1024), format=self._waiting_image_format))
@@ -59,22 +59,19 @@ class ImageSyncAggregator(FrameProcessor):
await self.push_frame(frame)
async def main():
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
camera_out_enabled=True,
camera_out_width=1024,
camera_out_height=1024,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_analyzer=SileroVADAnalyzer()
)
)
@@ -119,7 +116,7 @@ async def main():
async def on_first_participant_joined(transport, participant):
participant_name = participant["info"]["userName"] or ''
transport.capture_participant_transcription(participant["id"])
await task.queue_frames([TextFrame(f"Hi there {participant_name}!")])
await task.queue_frames([TextFrame(f"Hi, this is {participant_name}.")])
runner = PipelineRunner()
@@ -127,4 +124,5 @@ async def main():
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -31,10 +31,8 @@ logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
@@ -79,7 +77,6 @@ async def main():
task = PipelineTask(pipeline, PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
))
@@ -97,4 +94,5 @@ async def main():
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -31,10 +31,8 @@ logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
@@ -93,4 +91,5 @@ async def main():
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -47,10 +47,8 @@ def get_session_history(session_id: str) -> BaseChatMessageHistory:
return message_store[session_id]
async def main():
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
@@ -123,4 +121,5 @@ async def main():
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -31,10 +31,8 @@ logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
@@ -95,4 +93,5 @@ async def main():
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -4,7 +4,6 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
import aiohttp
import asyncio
import os
import sys
@@ -32,66 +31,64 @@ logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_sample_rate=44100,
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
async def main(room_url: str, token):
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=44100,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="a0e99841-438c-4a64-b679-ae501e7d6091", # Barbershop Man
sample_rate=44100,
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_name="British Lady",
output_format="pcm_44100"
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
tma_out, # Goes before the transport because cartesia has word-level timestamps!
transport.output(), # Transport bot output
])
pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True, enable_metrics=True))
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
runner = PipelineRunner()
await runner.run(task)
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -4,7 +4,6 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
import aiohttp
import asyncio
import os
import sys
@@ -31,66 +30,64 @@ logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=16000,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
async def main(room_url: str, token):
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=16000,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
)
tts = PlayHTTTSService(
user_id=os.getenv("PLAYHT_USER_ID"),
api_key=os.getenv("PLAYHT_API_KEY"),
voice_url="s3://voice-cloning-zero-shot/801a663f-efd0-4254-98d0-5c175514c3e8/jennifer/manifest.json",
)
tts = PlayHTTTSService(
user_id=os.getenv("PLAYHT_USER_ID"),
api_key=os.getenv("PLAYHT_API_KEY"),
voice_url="s3://voice-cloning-zero-shot/801a663f-efd0-4254-98d0-5c175514c3e8/jennifer/manifest.json",
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
runner = PipelineRunner()
await runner.run(task)
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -4,7 +4,6 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
import aiohttp
import asyncio
import os
import sys
@@ -31,73 +30,71 @@ logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=16000,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
)
async def main(room_url: str, token):
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=16000,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
)
)
stt = AzureSTTService(
api_key=os.getenv("AZURE_SPEECH_API_KEY"),
region=os.getenv("AZURE_SPEECH_REGION"),
)
stt = AzureSTTService(
api_key=os.getenv("AZURE_SPEECH_API_KEY"),
region=os.getenv("AZURE_SPEECH_REGION"),
)
tts = AzureTTSService(
api_key=os.getenv("AZURE_SPEECH_API_KEY"),
region=os.getenv("AZURE_SPEECH_REGION"),
)
tts = AzureTTSService(
api_key=os.getenv("AZURE_SPEECH_API_KEY"),
region=os.getenv("AZURE_SPEECH_REGION"),
)
llm = AzureLLMService(
api_key=os.getenv("AZURE_CHATGPT_API_KEY"),
endpoint=os.getenv("AZURE_CHATGPT_ENDPOINT"),
model=os.getenv("AZURE_CHATGPT_MODEL"),
)
llm = AzureLLMService(
api_key=os.getenv("AZURE_CHATGPT_API_KEY"),
endpoint=os.getenv("AZURE_CHATGPT_ENDPOINT"),
model=os.getenv("AZURE_CHATGPT_MODEL"),
)
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Transport user input
stt, # STT
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
pipeline = Pipeline([
transport.input(), # Transport user input
stt, # STT
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
runner = PipelineRunner()
await runner.run(task)
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -4,7 +4,6 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
import aiohttp
import asyncio
import os
import sys
@@ -31,65 +30,63 @@ logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=24000,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
async def main(room_url: str, token):
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=24000,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
)
tts = OpenAITTSService(
api_key=os.getenv("OPENAI_API_KEY"),
voice="alloy"
)
tts = OpenAITTSService(
api_key=os.getenv("OPENAI_API_KEY"),
voice="alloy"
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
runner = PipelineRunner()
await runner.run(task)
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -34,10 +34,8 @@ logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
@@ -100,4 +98,5 @@ async def main():
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -1,97 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import aiohttp
import os
import sys
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
from pipecat.services.deepgram import DeepgramSTTService, DeepgramTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.services.xtts import XTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
)
)
tts = XTTSService(
aiohttp_session=session,
voice_id="Claribel Dervla",
language="en",
base_url="http://localhost:8000"
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,102 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import aiohttp
import os
import sys
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
from pipecat.services.deepgram import DeepgramSTTService, DeepgramTTSService
from pipecat.services.gladia import GladiaSTTService
from pipecat.services.openai import OpenAILLMService
from pipecat.services.xtts import XTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
)
)
stt = GladiaSTTService(
api_key=os.getenv("GLADIA_API_KEY"),
)
tts = DeepgramTTSService(
aiohttp_session=session,
api_key=os.getenv("DEEPGRAM_API_KEY"),
voice="aura-helios-en"
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Transport user input
stt, # STT
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -22,10 +22,8 @@ logger = logging.getLogger("pipecat")
logger.setLevel(logging.DEBUG)
async def main():
async def main(room_url: str):
async with aiohttp.ClientSession() as session:
(room_url, _) = await configure(session)
transport = DailyTransport(
room_url,
None,
@@ -146,4 +144,5 @@ async def main():
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url))

View File

@@ -4,7 +4,6 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
import aiohttp
import asyncio
import sys
@@ -24,34 +23,32 @@ logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url, token, "Test",
DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
camera_out_enabled=True,
camera_out_is_live=True,
camera_out_width=1280,
camera_out_height=720
)
async def main(room_url, token):
transport = DailyTransport(
room_url, token, "Test",
DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
camera_out_enabled=True,
camera_out_is_live=True,
camera_out_width=1280,
camera_out_height=720
)
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_video(participant["id"])
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_video(participant["id"])
pipeline = Pipeline([transport.input(), transport.output()])
pipeline = Pipeline([transport.input(), transport.output()])
runner = PipelineRunner()
runner = PipelineRunner()
task = PipelineTask(pipeline)
task = PipelineTask(pipeline)
await runner.run(task)
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -4,7 +4,6 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
import aiohttp
import asyncio
import sys
@@ -28,44 +27,40 @@ logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
async def main(room_url, token):
tk_root = tk.Tk()
tk_root.title("Local Mirror")
tk_root = tk.Tk()
tk_root.title("Local Mirror")
daily_transport = DailyTransport(room_url, token, "Test", DailyParams(audio_in_enabled=True))
daily_transport = DailyTransport(
room_url, token, "Test", DailyParams(
audio_in_enabled=True))
tk_transport = TkLocalTransport(
tk_root,
TransportParams(
audio_out_enabled=True,
camera_out_enabled=True,
camera_out_is_live=True,
camera_out_width=1280,
camera_out_height=720))
tk_transport = TkLocalTransport(
tk_root,
TransportParams(
audio_out_enabled=True,
camera_out_enabled=True,
camera_out_is_live=True,
camera_out_width=1280,
camera_out_height=720))
@daily_transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_video(participant["id"])
@daily_transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_video(participant["id"])
pipeline = Pipeline([daily_transport.input(), tk_transport.output()])
pipeline = Pipeline([daily_transport.input(), tk_transport.output()])
task = PipelineTask(pipeline)
task = PipelineTask(pipeline)
async def run_tk():
while not task.has_finished():
tk_root.update()
tk_root.update_idletasks()
await asyncio.sleep(0.1)
async def run_tk():
while not task.has_finished():
tk_root.update()
tk_root.update_idletasks()
await asyncio.sleep(0.1)
runner = PipelineRunner()
runner = PipelineRunner()
await asyncio.gather(runner.run(task), run_tk())
await asyncio.gather(runner.run(task), run_tk())
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -31,10 +31,9 @@ logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
transport = DailyTransport(
room_url,
token,
@@ -91,4 +90,5 @@ async def main():
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -83,10 +83,8 @@ class InboundSoundEffectWrapper(FrameProcessor):
await self.push_frame(frame, direction)
async def main():
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
@@ -150,4 +148,5 @@ async def main():
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -49,10 +49,8 @@ class UserImageRequester(FrameProcessor):
await self.push_frame(frame, direction)
async def main():
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
@@ -110,4 +108,5 @@ async def main():
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -49,10 +49,8 @@ class UserImageRequester(FrameProcessor):
await self.push_frame(frame, direction)
async def main():
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
@@ -106,4 +104,5 @@ async def main():
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -49,10 +49,8 @@ class UserImageRequester(FrameProcessor):
await self.push_frame(frame, direction)
async def main():
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
@@ -106,4 +104,5 @@ async def main():
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -49,10 +49,8 @@ class UserImageRequester(FrameProcessor):
await self.push_frame(frame, direction)
async def main():
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
@@ -106,4 +104,5 @@ async def main():
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -4,7 +4,6 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
import aiohttp
import asyncio
import sys
@@ -36,25 +35,23 @@ class TranscriptionLogger(FrameProcessor):
print(f"Transcription: {frame.text}")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, _) = await configure(session)
async def main(room_url: str):
transport = DailyTransport(room_url, None, "Transcription bot",
DailyParams(audio_in_enabled=True))
transport = DailyTransport(room_url, None, "Transcription bot",
DailyParams(audio_in_enabled=True))
stt = WhisperSTTService()
stt = WhisperSTTService()
tl = TranscriptionLogger()
tl = TranscriptionLogger()
pipeline = Pipeline([transport.input(), stt, tl])
pipeline = Pipeline([transport.input(), stt, tl])
task = PipelineTask(pipeline)
task = PipelineTask(pipeline)
runner = PipelineRunner()
runner = PipelineRunner()
await runner.run(task)
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url))

View File

@@ -4,7 +4,6 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
import aiohttp
import asyncio
import os
import sys
@@ -37,25 +36,23 @@ class TranscriptionLogger(FrameProcessor):
print(f"Transcription: {frame.text}")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, _) = await configure(session)
async def main(room_url: str):
transport = DailyTransport(room_url, None, "Transcription bot",
DailyParams(audio_in_enabled=True))
transport = DailyTransport(room_url, None, "Transcription bot",
DailyParams(audio_in_enabled=True))
stt = DeepgramSTTService(os.getenv("DEEPGRAM_API_KEY"))
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tl = TranscriptionLogger()
tl = TranscriptionLogger()
pipeline = Pipeline([transport.input(), stt, tl])
pipeline = Pipeline([transport.input(), stt, tl])
task = PipelineTask(pipeline)
task = PipelineTask(pipeline)
runner = PipelineRunner()
runner = PipelineRunner()
await runner.run(task)
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url))

View File

@@ -44,10 +44,8 @@ async def fetch_weather_from_api(llm, args):
return {"conditions": "nice", "temperature": "75"}
async def main():
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
@@ -126,7 +124,7 @@ async def main():
task = PipelineTask(pipeline)
@transport.event_handler("on_first_participant_joined")
@ transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
@@ -136,5 +134,7 @@ async def main():
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -4,8 +4,8 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
import aiohttp
import asyncio
import aiohttp
import os
import sys
@@ -58,16 +58,15 @@ async def barbershop_man_filter(frame) -> bool:
return current_voice == "Barbershop Man"
async def main():
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Pipecat",
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=44100,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
@@ -76,17 +75,20 @@ async def main():
news_lady = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="bf991597-6c13-47e4-8411-91ec2de5c466", # Newslady
voice_name="Newslady",
output_format="pcm_44100"
)
british_lady = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
voice_name="British Lady",
output_format="pcm_44100"
)
barbershop_man = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="a0e99841-438c-4a64-b679-ae501e7d6091", # Barbershop Man
voice_name="Barbershop Man",
output_format="pcm_44100"
)
llm = OpenAILLMService(
@@ -153,4 +155,5 @@ async def main():
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -55,10 +55,8 @@ async def spanish_filter(frame) -> bool:
return current_language == "Spanish"
async def main():
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
@@ -151,4 +149,5 @@ async def main():
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -8,6 +8,7 @@ import asyncio
import aiohttp
import os
import sys
import json
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
@@ -31,10 +32,8 @@ logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
@@ -127,4 +126,5 @@ async def main():
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -1,109 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import aiohttp
import os
import sys
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.processors.user_idle_processor import UserIdleProcessor
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
)
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
async def user_idle_callback(user_idle: UserIdleProcessor):
messages.append(
{"role": "system", "content": "Ask the user if they are still there and try to prompt for some input, but be short."})
await user_idle.queue_frame(LLMMessagesFrame(messages))
user_idle = UserIdleProcessor(callback=user_idle_callback, timeout=5.0)
pipeline = Pipeline([
transport.input(), # Transport user input
user_idle, # Idle user check-in
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
task = PipelineTask(pipeline, PipelineParams(
allow_interruptions=True,
enable_metrics=True,
report_only_initial_ttfb=True,
))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,78 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import aiohttp
import argparse
import sys
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.gstreamer.pipeline_source import GStreamerPipelineSource
from pipecat.transports.services.daily import DailyParams, DailyTransport
from runner import configure_with_args
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
parser.add_argument(
"-i",
"--input",
type=str,
required=True,
help="Input video file")
(room_url, _, args) = await configure_with_args(session, parser)
transport = DailyTransport(
room_url,
None,
"GStreamer",
DailyParams(
audio_out_enabled=True,
audio_out_is_live=True,
camera_out_enabled=True,
camera_out_width=1280,
camera_out_height=720,
camera_out_is_live=True,
)
)
gst = GStreamerPipelineSource(
pipeline=f"filesrc location={args.input}",
out_params=GStreamerPipelineSource.OutputParams(
video_width=1280,
video_height=720,
audio_sample_rate=16000,
audio_channels=1,
)
)
pipeline = Pipeline([
gst, # GStreamer file source
transport.output(), # Transport bot output
])
task = PipelineTask(pipeline)
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,64 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import aiohttp
import sys
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.gstreamer.pipeline_source import GStreamerPipelineSource
from pipecat.transports.services.daily import DailyParams, DailyTransport
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, _) = await configure(session)
transport = DailyTransport(
room_url,
None,
"GStreamer",
DailyParams(
camera_out_enabled=True,
camera_out_width=1280,
camera_out_height=720,
camera_out_is_live=True,
)
)
gst = GStreamerPipelineSource(
pipeline="videotestsrc ! capsfilter caps=\"video/x-raw,width=1280,height=720,framerate=30/1\"",
out_params=GStreamerPipelineSource.OutputParams(
video_width=1280,
video_height=720,
clock_sync=False))
pipeline = Pipeline([
gst, # GStreamer file source
transport.output(), # Transport bot output
])
task = PipelineTask(pipeline)
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,26 +1,12 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import aiohttp
import argparse
import os
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper
import time
import urllib
import requests
async def configure(aiohttp_session: aiohttp.ClientSession):
(url, token, _) = await configure_with_args(aiohttp_session)
return (url, token)
async def configure_with_args(
aiohttp_session: aiohttp.ClientSession,
parser: argparse.ArgumentParser | None = None):
if not parser:
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
def configure():
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
parser.add_argument(
"-u",
"--url",
@@ -47,15 +33,26 @@ async def configure_with_args(
if not key:
raise Exception("No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers.")
daily_rest_helper = DailyRESTHelper(
daily_api_key=key,
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session)
# Create a meeting token for the given room with an expiration 1 hour in
# the future.
expiry_time: float = 60 * 60
room_name: str = urllib.parse.urlparse(url).path[1:]
expiration: float = time.time() + 60 * 60
token = await daily_rest_helper.get_token(url, expiry_time)
res: requests.Response = requests.post(
f"https://api.daily.co/v1/meeting-tokens",
headers={
"Authorization": f"Bearer {key}"},
json={
"properties": {
"room_name": room_name,
"is_owner": True,
"exp": expiration}},
)
return (url, token, args)
if res.status_code != 200:
raise Exception(
f"Failed to create meeting token: {res.status_code} {res.text}")
token: str = res.json()["token"]
return (url, token)

View File

@@ -1,9 +1,3 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import aiohttp
import os
@@ -134,10 +128,8 @@ class ImageFilterProcessor(FrameProcessor):
await self.push_frame(frame)
async def main():
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
@@ -212,4 +204,5 @@ async def main():
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -1,4 +1,5 @@
python-dotenv
requests
fastapi[all]
uvicorn
pipecat-ai[daily,moondream,openai,silero]

View File

@@ -1,17 +1,11 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import aiohttp
import argparse
import os
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper
import time
import urllib
import requests
async def configure(aiohttp_session: aiohttp.ClientSession):
def configure():
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
parser.add_argument(
"-u",
@@ -39,16 +33,26 @@ async def configure(aiohttp_session: aiohttp.ClientSession):
if not key:
raise Exception("No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers.")
daily_rest_helper = DailyRESTHelper(
daily_api_key=key,
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session
)
# Create a meeting token for the given room with an expiration 1 hour in
# the future.
expiry_time: float = 60 * 60
room_name: str = urllib.parse.urlparse(url).path[1:]
expiration: float = time.time() + 60 * 60
token = await daily_rest_helper.get_token(url, expiry_time)
res: requests.Response = requests.post(
f"https://api.daily.co/v1/meeting-tokens",
headers={
"Authorization": f"Bearer {key}"},
json={
"properties": {
"room_name": room_name,
"is_owner": True,
"exp": expiration}},
)
if res.status_code != 200:
raise Exception(
f"Failed to create meeting token: {res.status_code} {res.text}")
token: str = res.json()["token"]
return (url, token)

View File

@@ -1,51 +1,31 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import aiohttp
import os
import argparse
import subprocess
from contextlib import asynccontextmanager
import atexit
from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, RedirectResponse
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomParams
from utils.daily_helpers import create_room as _create_room, get_token
MAX_BOTS_PER_ROOM = 1
# Bot sub-process dict for status reporting and concurrency control
bot_procs = {}
daily_helpers = {}
def cleanup():
# Clean up function, just to be extra safe
for entry in bot_procs.values():
proc = entry[0]
for proc in bot_procs.values():
proc.terminate()
proc.wait()
@asynccontextmanager
async def lifespan(app: FastAPI):
aiohttp_session = aiohttp.ClientSession()
daily_helpers["rest"] = DailyRESTHelper(
daily_api_key=os.getenv("DAILY_API_KEY", ""),
daily_api_url=os.getenv("DAILY_API_URL", 'https://api.daily.co/v1'),
aiohttp_session=aiohttp_session
)
yield
await aiohttp_session.close()
cleanup()
atexit.register(cleanup)
app = FastAPI(lifespan=lifespan)
app = FastAPI()
app.add_middleware(
CORSMiddleware,
@@ -59,45 +39,45 @@ app.add_middleware(
@app.get("/start")
async def start_agent(request: Request):
print(f"!!! Creating room")
room = await daily_helpers["rest"].create_room(DailyRoomParams())
print(f"!!! Room URL: {room.url}")
room_url, room_name = _create_room()
print(f"!!! Room URL: {room_url}")
# Ensure the room property is present
if not room.url:
if not room_url:
raise HTTPException(
status_code=500,
detail="Missing 'room' property in request data. Cannot start agent without a target room!")
# Check if there is already an existing process running in this room
num_bots_in_room = sum(
1 for proc in bot_procs.values() if proc[1] == room.url and proc[0].poll() is None)
1 for proc in bot_procs.values() if proc[1] == room_url and proc[0].poll() is None)
if num_bots_in_room >= MAX_BOTS_PER_ROOM:
raise HTTPException(
status_code=500, detail=f"Max bot limited reach for room: {room.url}")
status_code=500, detail=f"Max bot limited reach for room: {room_url}")
# Get the token for the room
token = await daily_helpers["rest"].get_token(room.url)
token = get_token(room_url)
if not token:
raise HTTPException(
status_code=500, detail=f"Failed to get token for room: {room.url}")
status_code=500, detail=f"Failed to get token for room: {room_url}")
# Spawn a new agent, and join the user session
# Note: this is mostly for demonstration purposes (refer to 'deployment' in README)
try:
proc = subprocess.Popen(
[
f"python3 -m bot -u {room.url} -t {token}"
f"python3 -m bot -u {room_url} -t {token}"
],
shell=True,
bufsize=1,
cwd=os.path.dirname(os.path.abspath(__file__))
)
bot_procs[proc.pid] = (proc, room.url)
bot_procs[proc.pid] = (proc, room_url)
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to start subprocess: {e}")
return RedirectResponse(room.url)
return RedirectResponse(room_url)
@app.get("/status/{pid}")

View File

@@ -0,0 +1,109 @@
import urllib.parse
import os
import time
import urllib
import requests
from dotenv import load_dotenv
load_dotenv()
daily_api_path = os.getenv("DAILY_API_URL") or "api.daily.co/v1"
daily_api_key = os.getenv("DAILY_API_KEY")
def create_room() -> tuple[str, str]:
"""
Helper function to create a Daily room.
# See: https://docs.daily.co/reference/rest-api/rooms
Returns:
tuple: A tuple containing the room URL and room name.
Raises:
Exception: If the request to create the room fails or if the response does not contain the room URL or room name.
"""
room_props = {
"exp": time.time() + 60 * 60, # 1 hour
"enable_chat": True,
"enable_emoji_reactions": True,
"eject_at_room_exp": True,
"enable_prejoin_ui": False, # Important for the bot to be able to join headlessly
}
res = requests.post(
f"https://{daily_api_path}/rooms",
headers={"Authorization": f"Bearer {daily_api_key}"},
json={
"properties": room_props
},
)
if res.status_code != 200:
raise Exception(f"Unable to create room: {res.text}")
data = res.json()
room_url: str = data.get("url")
room_name: str = data.get("name")
if room_url is None or room_name is None:
raise Exception("Missing room URL or room name in response")
return room_url, room_name
def get_name_from_url(room_url: str) -> str:
"""
Extracts the name from a given room URL.
Args:
room_url (str): The URL of the room.
Returns:
str: The extracted name from the room URL.
"""
return urllib.parse.urlparse(room_url).path[1:]
def get_token(room_url: str) -> str:
"""
Retrieves a meeting token for the specified Daily room URL.
# See: https://docs.daily.co/reference/rest-api/meeting-tokens
Args:
room_url (str): The URL of the Daily room.
Returns:
str: The meeting token.
Raises:
Exception: If no room URL is specified or if no Daily API key is specified.
Exception: If there is an error creating the meeting token.
"""
if not room_url:
raise Exception(
"No Daily room specified. You must specify a Daily room in order a token to be generated.")
if not daily_api_key:
raise Exception(
"No Daily API key specified. set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers.")
expiration: float = time.time() + 60 * 60
room_name = get_name_from_url(room_url)
res: requests.Response = requests.post(
f"https://{daily_api_path}/meeting-tokens",
headers={
"Authorization": f"Bearer {daily_api_key}"},
json={
"properties": {
"room_name": room_name,
"is_owner": True, # Owner tokens required for transcription
"exp": expiration}},
)
if res.status_code != 200:
raise Exception(
f"Failed to create meeting token: {res.status_code} {res.text}")
token: str = res.json()["token"]
return token

View File

@@ -257,16 +257,17 @@ class IntakeProcessor:
return None
async def main():
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Chatbot",
DailyParams(
audio_out_enabled=True,
camera_out_enabled=True,
camera_out_width=1024,
camera_out_height=576,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
transcription_enabled=True,
@@ -350,4 +351,5 @@ async def main():
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -1,4 +1,5 @@
python-dotenv
requests
fastapi[all]
uvicorn
pipecat-ai[daily,openai,silero]

View File

@@ -1,17 +1,11 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import aiohttp
import argparse
import os
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper
import time
import urllib
import requests
async def configure(aiohttp_session: aiohttp.ClientSession):
def configure():
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
parser.add_argument(
"-u",
@@ -39,15 +33,26 @@ async def configure(aiohttp_session: aiohttp.ClientSession):
if not key:
raise Exception("No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers.")
daily_rest_helper = DailyRESTHelper(
daily_api_key=key,
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session)
# Create a meeting token for the given room with an expiration 1 hour in
# the future.
expiry_time: float = 60 * 60
room_name: str = urllib.parse.urlparse(url).path[1:]
expiration: float = time.time() + 60 * 60
token = await daily_rest_helper.get_token(url, expiry_time)
res: requests.Response = requests.post(
f"https://api.daily.co/v1/meeting-tokens",
headers={
"Authorization": f"Bearer {key}"},
json={
"properties": {
"room_name": room_name,
"is_owner": True,
"exp": expiration}},
)
if res.status_code != 200:
raise Exception(
f"Failed to create meeting token: {res.status_code} {res.text}")
token: str = res.json()["token"]
return (url, token)

View File

@@ -1,51 +1,31 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import aiohttp
import os
import argparse
import subprocess
from contextlib import asynccontextmanager
import atexit
from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, RedirectResponse
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomParams
from utils.daily_helpers import create_room as _create_room, get_token
MAX_BOTS_PER_ROOM = 1
# Bot sub-process dict for status reporting and concurrency control
bot_procs = {}
daily_helpers = {}
def cleanup():
# Clean up function, just to be extra safe
for entry in bot_procs.values():
proc = entry[0]
for proc in bot_procs.values():
proc.terminate()
proc.wait()
@asynccontextmanager
async def lifespan(app: FastAPI):
aiohttp_session = aiohttp.ClientSession()
daily_helpers["rest"] = DailyRESTHelper(
daily_api_key=os.getenv("DAILY_API_KEY", ""),
daily_api_url=os.getenv("DAILY_API_URL", 'https://api.daily.co/v1'),
aiohttp_session=aiohttp_session
)
yield
await aiohttp_session.close()
cleanup()
atexit.register(cleanup)
app = FastAPI(lifespan=lifespan)
app = FastAPI()
app.add_middleware(
CORSMiddleware,
@@ -59,45 +39,45 @@ app.add_middleware(
@app.get("/start")
async def start_agent(request: Request):
print(f"!!! Creating room")
room = await daily_helpers["rest"].create_room(DailyRoomParams())
print(f"!!! Room URL: {room.url}")
room_url, room_name = _create_room()
print(f"!!! Room URL: {room_url}")
# Ensure the room property is present
if not room.url:
if not room_url:
raise HTTPException(
status_code=500,
detail="Missing 'room' property in request data. Cannot start agent without a target room!")
# Check if there is already an existing process running in this room
num_bots_in_room = sum(
1 for proc in bot_procs.values() if proc[1] == room.url and proc[0].poll() is None)
1 for proc in bot_procs.values() if proc[1] == room_url and proc[0].poll() is None)
if num_bots_in_room >= MAX_BOTS_PER_ROOM:
raise HTTPException(
status_code=500, detail=f"Max bot limited reach for room: {room.url}")
status_code=500, detail=f"Max bot limited reach for room: {room_url}")
# Get the token for the room
token = await daily_helpers["rest"].get_token(room.url)
token = get_token(room_url)
if not token:
raise HTTPException(
status_code=500, detail=f"Failed to get token for room: {room.url}")
status_code=500, detail=f"Failed to get token for room: {room_url}")
# Spawn a new agent, and join the user session
# Note: this is mostly for demonstration purposes (refer to 'deployment' in README)
try:
proc = subprocess.Popen(
[
f"python3 -m bot -u {room.url} -t {token}"
f"python3 -m bot -u {room_url} -t {token}"
],
shell=True,
bufsize=1,
cwd=os.path.dirname(os.path.abspath(__file__))
)
bot_procs[proc.pid] = (proc, room.url)
bot_procs[proc.pid] = (proc, room_url)
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to start subprocess: {e}")
return RedirectResponse(room.url)
return RedirectResponse(room_url)
@app.get("/status/{pid}")

View File

@@ -0,0 +1,109 @@
import urllib.parse
import os
import time
import urllib
import requests
from dotenv import load_dotenv
load_dotenv()
daily_api_path = os.getenv("DAILY_API_URL") or "api.daily.co/v1"
daily_api_key = os.getenv("DAILY_API_KEY")
def create_room() -> tuple[str, str]:
"""
Helper function to create a Daily room.
# See: https://docs.daily.co/reference/rest-api/rooms
Returns:
tuple: A tuple containing the room URL and room name.
Raises:
Exception: If the request to create the room fails or if the response does not contain the room URL or room name.
"""
room_props = {
"exp": time.time() + 60 * 60, # 1 hour
"enable_chat": True,
"enable_emoji_reactions": True,
"eject_at_room_exp": True,
"enable_prejoin_ui": False, # Important for the bot to be able to join headlessly
}
res = requests.post(
f"https://{daily_api_path}/rooms",
headers={"Authorization": f"Bearer {daily_api_key}"},
json={
"properties": room_props
},
)
if res.status_code != 200:
raise Exception(f"Unable to create room: {res.text}")
data = res.json()
room_url: str = data.get("url")
room_name: str = data.get("name")
if room_url is None or room_name is None:
raise Exception("Missing room URL or room name in response")
return room_url, room_name
def get_name_from_url(room_url: str) -> str:
"""
Extracts the name from a given room URL.
Args:
room_url (str): The URL of the room.
Returns:
str: The extracted name from the room URL.
"""
return urllib.parse.urlparse(room_url).path[1:]
def get_token(room_url: str) -> str:
"""
Retrieves a meeting token for the specified Daily room URL.
# See: https://docs.daily.co/reference/rest-api/meeting-tokens
Args:
room_url (str): The URL of the Daily room.
Returns:
str: The meeting token.
Raises:
Exception: If no room URL is specified or if no Daily API key is specified.
Exception: If there is an error creating the meeting token.
"""
if not room_url:
raise Exception(
"No Daily room specified. You must specify a Daily room in order a token to be generated.")
if not daily_api_key:
raise Exception(
"No Daily API key specified. set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers.")
expiration: float = time.time() + 60 * 60
room_name = get_name_from_url(room_url)
res: requests.Response = requests.post(
f"https://{daily_api_path}/meeting-tokens",
headers={
"Authorization": f"Bearer {daily_api_key}"},
json={
"properties": {
"room_name": room_name,
"is_owner": True, # Owner tokens required for transcription
"exp": expiration}},
)
if res.status_code != 200:
raise Exception(
f"Failed to create meeting token: {res.status_code} {res.text}")
token: str = res.json()["token"]
return token

View File

@@ -1,9 +1,3 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import aiohttp
import os
@@ -83,10 +77,8 @@ class TalkingAnimation(FrameProcessor):
await self.push_frame(frame)
async def main():
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
@@ -173,4 +165,5 @@ async def main():
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -1,4 +1,5 @@
python-dotenv
requests
fastapi[all]
uvicorn
pipecat-ai[daily,openai,silero]

View File

@@ -1,17 +1,11 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import aiohttp
import argparse
import os
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper
import time
import urllib
import requests
async def configure(aiohttp_session: aiohttp.ClientSession):
def configure():
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
parser.add_argument(
"-u",
@@ -39,16 +33,26 @@ async def configure(aiohttp_session: aiohttp.ClientSession):
if not key:
raise Exception("No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers.")
daily_rest_helper = DailyRESTHelper(
daily_api_key=key,
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session
)
# Create a meeting token for the given room with an expiration 1 hour in
# the future.
expiry_time: float = 60 * 60
room_name: str = urllib.parse.urlparse(url).path[1:]
expiration: float = time.time() + 60 * 60
token = await daily_rest_helper.get_token(url, expiry_time)
res: requests.Response = requests.post(
f"https://api.daily.co/v1/meeting-tokens",
headers={
"Authorization": f"Bearer {key}"},
json={
"properties": {
"room_name": room_name,
"is_owner": True,
"exp": expiration}},
)
if res.status_code != 200:
raise Exception(
f"Failed to create meeting token: {res.status_code} {res.text}")
token: str = res.json()["token"]
return (url, token)

View File

@@ -1,51 +1,31 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import aiohttp
import os
import argparse
import subprocess
from contextlib import asynccontextmanager
import atexit
from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, RedirectResponse
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomParams
from utils.daily_helpers import create_room as _create_room, get_token
MAX_BOTS_PER_ROOM = 1
# Bot sub-process dict for status reporting and concurrency control
bot_procs = {}
daily_helpers = {}
def cleanup():
# Clean up function, just to be extra safe
for entry in bot_procs.values():
proc = entry[0]
for proc in bot_procs.values():
proc.terminate()
proc.wait()
@asynccontextmanager
async def lifespan(app: FastAPI):
aiohttp_session = aiohttp.ClientSession()
daily_helpers["rest"] = DailyRESTHelper(
daily_api_key=os.getenv("DAILY_API_KEY", ""),
daily_api_url=os.getenv("DAILY_API_URL", 'https://api.daily.co/v1'),
aiohttp_session=aiohttp_session
)
yield
await aiohttp_session.close()
cleanup()
atexit.register(cleanup)
app = FastAPI(lifespan=lifespan)
app = FastAPI()
app.add_middleware(
CORSMiddleware,
@@ -59,45 +39,45 @@ app.add_middleware(
@app.get("/start")
async def start_agent(request: Request):
print(f"!!! Creating room")
room = await daily_helpers["rest"].create_room(DailyRoomParams())
print(f"!!! Room URL: {room.url}")
room_url, room_name = _create_room()
print(f"!!! Room URL: {room_url}")
# Ensure the room property is present
if not room.url:
if not room_url:
raise HTTPException(
status_code=500,
detail="Missing 'room' property in request data. Cannot start agent without a target room!")
# Check if there is already an existing process running in this room
num_bots_in_room = sum(
1 for proc in bot_procs.values() if proc[1] == room.url and proc[0].poll() is None)
1 for proc in bot_procs.values() if proc[1] == room_url and proc[0].poll() is None)
if num_bots_in_room >= MAX_BOTS_PER_ROOM:
raise HTTPException(
status_code=500, detail=f"Max bot limited reach for room: {room.url}")
status_code=500, detail=f"Max bot limited reach for room: {room_url}")
# Get the token for the room
token = await daily_helpers["rest"].get_token(room.url)
token = get_token(room_url)
if not token:
raise HTTPException(
status_code=500, detail=f"Failed to get token for room: {room.url}")
status_code=500, detail=f"Failed to get token for room: {room_url}")
# Spawn a new agent, and join the user session
# Note: this is mostly for demonstration purposes (refer to 'deployment' in README)
try:
proc = subprocess.Popen(
[
f"python3 -m bot -u {room.url} -t {token}"
f"python3 -m bot -u {room_url} -t {token}"
],
shell=True,
bufsize=1,
cwd=os.path.dirname(os.path.abspath(__file__))
)
bot_procs[proc.pid] = (proc, room.url)
bot_procs[proc.pid] = (proc, room_url)
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to start subprocess: {e}")
return RedirectResponse(room.url)
return RedirectResponse(room_url)
@app.get("/status/{pid}")

View File

@@ -0,0 +1,109 @@
import urllib.parse
import os
import time
import urllib
import requests
from dotenv import load_dotenv
load_dotenv()
daily_api_path = os.getenv("DAILY_API_URL") or "api.daily.co/v1"
daily_api_key = os.getenv("DAILY_API_KEY")
def create_room() -> tuple[str, str]:
"""
Helper function to create a Daily room.
# See: https://docs.daily.co/reference/rest-api/rooms
Returns:
tuple: A tuple containing the room URL and room name.
Raises:
Exception: If the request to create the room fails or if the response does not contain the room URL or room name.
"""
room_props = {
"exp": time.time() + 60 * 60, # 1 hour
"enable_chat": True,
"enable_emoji_reactions": True,
"eject_at_room_exp": True,
"enable_prejoin_ui": False, # Important for the bot to be able to join headlessly
}
res = requests.post(
f"https://{daily_api_path}/rooms",
headers={"Authorization": f"Bearer {daily_api_key}"},
json={
"properties": room_props
},
)
if res.status_code != 200:
raise Exception(f"Unable to create room: {res.text}")
data = res.json()
room_url: str = data.get("url")
room_name: str = data.get("name")
if room_url is None or room_name is None:
raise Exception("Missing room URL or room name in response")
return room_url, room_name
def get_name_from_url(room_url: str) -> str:
"""
Extracts the name from a given room URL.
Args:
room_url (str): The URL of the room.
Returns:
str: The extracted name from the room URL.
"""
return urllib.parse.urlparse(room_url).path[1:]
def get_token(room_url: str) -> str:
"""
Retrieves a meeting token for the specified Daily room URL.
# See: https://docs.daily.co/reference/rest-api/meeting-tokens
Args:
room_url (str): The URL of the Daily room.
Returns:
str: The meeting token.
Raises:
Exception: If no room URL is specified or if no Daily API key is specified.
Exception: If there is an error creating the meeting token.
"""
if not room_url:
raise Exception(
"No Daily room specified. You must specify a Daily room in order a token to be generated.")
if not daily_api_key:
raise Exception(
"No Daily API key specified. set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers.")
expiration: float = time.time() + 60 * 60
room_name = get_name_from_url(room_url)
res: requests.Response = requests.post(
f"https://{daily_api_path}/meeting-tokens",
headers={
"Authorization": f"Bearer {daily_api_key}"},
json={
"properties": {
"room_name": room_name,
"is_owner": True, # Owner tokens required for transcription
"exp": expiration}},
)
if res.status_code != 200:
raise Exception(
f"Failed to create meeting token: {res.status_code} {res.text}")
token: str = res.json()["token"]
return token

View File

@@ -1,4 +1,4 @@
FROM python:3.11-slim-bookworm
FROM python:3.11-bullseye
ARG DEBIAN_FRONTEND=noninteractive
ARG USE_PERSISTENT_DATA
@@ -51,4 +51,4 @@ COPY --chown=user ./frontend/ frontend/
RUN cd frontend && npm install && npm run build
# Start the FastAPI server
CMD python3 src/bot_runner.py --port ${FAST_API_PORT}
CMD python3 src/server.py --port ${FAST_API_PORT}

View File

@@ -48,8 +48,6 @@ pip install -r requirements.txt
mv env.example .env
```
When deploying to production, to ensure only this app can spawn a new bot, set your `ENV` to `production`
**Build the frontend:**
This project uses a custom frontend, which needs to built. Note: this is done automatically as part of the Docker deployment.
@@ -66,11 +64,11 @@ The build UI files can be found in `frontend/out`
Start the API / bot manager:
`python src/bot_runner.py`
`python src/server.py`
If you'd like to run a custom domain or port:
`python src/bot_runner.py --host somehost --p someport`
`python src/server.py --host somehost --p 7777`
➡️ Open the host URL in your browser `http://localhost:7860`

View File

@@ -1,9 +1,5 @@
DAILY_API_KEY=
DAILY_SAMPLE_ROOM_URL=
ELEVENLABS_API_KEY=
ELEVENLABS_VOICE_ID=
FAL_KEY=
OPENAI_API_KEY=
ENV= # dev | production
RUN_AS_VM= # Set this if you want to run bots on process (not launch a new VM)
DAILY_API_KEY=7df...
ELEVENLABS_API_KEY=aeb...
ELEVENLABS_VOICE_ID=7S...
FAL_KEY=8c...
OPENAI_API_KEY=sk-PL...

View File

@@ -27,11 +27,14 @@ export default function Call() {
// Create a new room for the story session
try {
const response = await fetch("/start_bot", {
const response = await fetch("/create", {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({
room_url: process.env.NEXT_PUBLIC_ROOM_URL || null,
}),
});
const { room_url, token } = await response.json();
@@ -52,9 +55,21 @@ export default function Call() {
// Disable local audio, the bot will say hello first
daily.setLocalAudio(false);
// Start the bot
const resp = await fetch("/start", {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({
room_url,
}),
});
setState("started");
} catch (error) {
setState("error");
leave();
}
}
@@ -64,13 +79,7 @@ export default function Call() {
}
if (state === "error") {
return (
<div className="flex items-center mx-auto">
<p className="text-red-500 font-semibold bg-white px-4 py-2 shadow-xl rounded-lg">
This demo is currently at capacity. Please try again later.
</p>
</div>
);
return <div>An Error occured</div>;
}
if (state === "started") {

View File

@@ -108,26 +108,26 @@ export default function DevicePicker({}: Props) {
{hasMicError && (
<div className="error">
{micState === "blocked" ? (
<p className="text-red-500">
<p>
Please check your browser and system permissions. Make sure that
this app is allowed to access your microphone.
</p>
) : micState === "in-use" ? (
<p className="text-red-500">
<p>
Your microphone is being used by another app. Please close any
other apps using your microphone and restart this app.
</p>
) : micState === "not-found" ? (
<p className="text-red-500">
<p>
No microphone seems to be connected. Please connect a microphone.
</p>
) : micState === "not-supported" ? (
<p className="text-red-500">
<p>
This app is not supported on your device. Please update your
software or use a different device.
</p>
) : (
<p className="text-red-500">
<p>
There seems to be an issue accessing your microphone. Try
restarting the app or consult a system administrator.
</p>

View File

@@ -1,7 +1,7 @@
import React from "react";
import { Button } from "@/components/ui/button";
import DevicePicker from "@/components/DevicePicker";
import { IconAlertCircle, IconEar, IconLoader2 } from "@tabler/icons-react";
import { IconEar, IconLoader2 } from "@tabler/icons-react";
type SetupProps = {
handleStart: () => void;
@@ -24,6 +24,7 @@ export const Setup: React.FC<SetupProps> = ({ handleStart }) => {
<h1 className="text-4xl font-bold text-pretty tracking-tighter mb-4">
Welcome to <span className="text-sky-500">Storytime</span>
</h1>
{state === "intro" ? (
<>
<p className="text-gray-600 leading-relaxed text-pretty">
@@ -37,9 +38,6 @@ export const Setup: React.FC<SetupProps> = ({ handleStart }) => {
<IconEar size={24} /> For best results, try in a quiet
environment!
</p>
<p className="flex flex-row gap-2 text-gray-600 font-medium text-red-500">
<IconAlertCircle size={24} /> This demo expires after 5 minutes.
</p>
</>
) : (
<>
@@ -51,6 +49,7 @@ export const Setup: React.FC<SetupProps> = ({ handleStart }) => {
<DevicePicker />
</>
)}
<hr className="border-gray-150 my-2" />
<Button

View File

@@ -1 +1,2 @@
NEXT_PUBLIC_ROOM_URL=
SITE_URL=

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -1,5 +1,6 @@
async_timeout
fastapi
uvicorn
requests
python-dotenv
pipecat-ai[daily,openai,fal]

View File

@@ -5,7 +5,7 @@ import os
import sys
from pipecat.frames.frames import LLMMessagesFrame, StopTaskFrame, EndFrame
from pipecat.frames.frames import LLMMessagesFrame, StopTaskFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
@@ -139,16 +139,6 @@ async def main(room_url, token=None):
main_task = PipelineTask(main_pipeline)
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
intro_task.queue_frame(EndFrame())
await main_task.queue_frame(EndFrame())
@transport.event_handler("on_call_state_updated")
async def on_call_state_updated(transport, state):
if state == "left":
await main_task.queue_frame(EndFrame())
await runner.run(main_task)
if __name__ == "__main__":

View File

@@ -1,251 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import aiohttp
import argparse
import subprocess
import os
from pathlib import Path
from typing import Optional
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse, JSONResponse
from pipecat.transports.services.helpers.daily_rest import (
DailyRESTHelper, DailyRoomObject, DailyRoomProperties, DailyRoomParams)
from dotenv import load_dotenv
load_dotenv(override=True)
# ------------ Fast API Config ------------ #
MAX_SESSION_TIME = 5 * 60 # 5 minutes
daily_helpers = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
aiohttp_session = aiohttp.ClientSession()
daily_helpers["rest"] = DailyRESTHelper(
daily_api_key=os.getenv("DAILY_API_KEY", ""),
daily_api_url=os.getenv("DAILY_API_URL", 'https://api.daily.co/v1'),
aiohttp_session=aiohttp_session
)
yield
await aiohttp_session.close()
app = FastAPI(lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Mount the static directory
STATIC_DIR = "frontend/out"
# ------------ Fast API Routes ------------ #
app.mount("/static", StaticFiles(directory=STATIC_DIR, html=True), name="static")
@app.post("/start_bot")
async def start_bot(request: Request) -> JSONResponse:
if os.getenv("ENV", "dev") == "production":
# Only allow requests from the specified domain
host_header = request.headers.get("host")
allowed_domains = ["storytelling-chatbot.fly.dev", "www.storytelling-chatbot.fly.dev"]
# Check if the Host header matches the allowed domain
if host_header not in allowed_domains:
raise HTTPException(status_code=403, detail="Access denied")
try:
data = await request.json()
# Is this a webhook creation request?
if "test" in data:
return JSONResponse({"test": True})
except Exception as e:
pass
# Use specified room URL, or create a new one if not specified
room_url = os.getenv("DAILY_SAMPLE_ROOM_URL", "")
if not room_url:
params = DailyRoomParams(
properties=DailyRoomProperties()
)
try:
room: DailyRoomObject = await daily_helpers["rest"].create_room(params=params)
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Unable to provision room {e}")
else:
# Check passed room URL exists, we should assume that it already has a sip set up
try:
room: DailyRoomObject = await daily_helpers["rest"].get_room_from_url(room_url)
except Exception:
raise HTTPException(
status_code=500, detail=f"Room not found: {room_url}")
# Give the agent a token to join the session
token = await daily_helpers["rest"].get_token(room.url, MAX_SESSION_TIME)
if not room or not token:
raise HTTPException(
status_code=500, detail=f"Failed to get token for room: {room_url}")
# Launch a new VM, or run as a shell process (not recommended)
if os.getenv("RUN_AS_VM", False):
try:
await virtualize_bot(room.url, token)
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to spawn VM: {e}")
else:
try:
subprocess.Popen(
[f"python3 -m bot -u {room.url} -t {token}"],
shell=True,
bufsize=1,
cwd=os.path.dirname(os.path.abspath(__file__)))
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to start subprocess: {e}")
# Grab a token for the user to join with
user_token = await daily_helpers["rest"].get_token(room.url, MAX_SESSION_TIME)
return JSONResponse({
"room_url": room.url,
"token": user_token,
})
@app.get("/{path_name:path}", response_class=FileResponse)
async def catch_all(path_name: Optional[str] = ""):
if path_name == "":
return FileResponse(f"{STATIC_DIR}/index.html")
file_path = Path(STATIC_DIR) / (path_name or "")
if file_path.is_file():
return file_path
html_file_path = file_path.with_suffix(".html")
if html_file_path.is_file():
return FileResponse(html_file_path)
raise HTTPException(status_code=450, detail="Incorrect API call")
# ------------ Virtualization ------------ #
async def virtualize_bot(room_url: str, token: str):
"""
This is an example of how to virtualize the bot using Fly.io
You can adapt this method to use whichever cloud provider you prefer.
"""
FLY_API_HOST = os.getenv("FLY_API_HOST", "https://api.machines.dev/v1")
FLY_APP_NAME = os.getenv("FLY_APP_NAME", "storytelling-chatbot")
FLY_API_KEY = os.getenv("FLY_API_KEY", "")
FLY_HEADERS = {
'Authorization': f"Bearer {FLY_API_KEY}",
'Content-Type': 'application/json'
}
async with aiohttp.ClientSession() as session:
# Use the same image as the bot runner
async with session.get(f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines", headers=FLY_HEADERS) as r:
if r.status != 200:
text = await r.text()
raise Exception(f"Unable to get machine info from Fly: {text}")
data = await r.json()
image = data[0]['config']['image']
# Machine configuration
cmd = f"python3 src/bot.py -u {room_url} -t {token}"
cmd = cmd.split()
worker_props = {
"config": {
"image": image,
"auto_destroy": True,
"init": {
"cmd": cmd
},
"restart": {
"policy": "no"
},
"guest": {
"cpu_kind": "shared",
"cpus": 1,
"memory_mb": 512
}
},
}
# Spawn a new machine instance
async with session.post(f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines", headers=FLY_HEADERS, json=worker_props) as r:
if r.status != 200:
text = await r.text()
raise Exception(f"Problem starting a bot worker: {text}")
data = await r.json()
# Wait for the machine to enter the started state
vm_id = data['id']
async with session.get(f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines/{vm_id}/wait?state=started", headers=FLY_HEADERS) as r:
if r.status != 200:
text = await r.text()
raise Exception(f"Bot was unable to enter started state: {text}")
print(f"Machine joined room: {room_url}")
# ------------ Main ------------ #
if __name__ == "__main__":
# Check environment variables
required_env_vars = ['OPENAI_API_KEY', 'DAILY_API_KEY',
'FAL_KEY', 'ELEVENLABS_VOICE_ID', 'ELEVENLABS_API_KEY']
for env_var in required_env_vars:
if env_var not in os.environ:
raise Exception(f"Missing environment variable: {env_var}.")
import uvicorn
default_host = os.getenv("HOST", "0.0.0.0")
default_port = int(os.getenv("FAST_API_PORT", "7860"))
parser = argparse.ArgumentParser(
description="Daily Storyteller FastAPI server")
parser.add_argument("--host", type=str,
default=default_host, help="Host address")
parser.add_argument("--port", type=int,
default=default_port, help="Port number")
parser.add_argument("--reload", action="store_true",
help="Reload code on change")
config = parser.parse_args()
uvicorn.run(
"bot_runner:app",
host=config.host,
port=config.port,
reload=config.reload
)

View File

@@ -0,0 +1,175 @@
import os
import argparse
import subprocess
import atexit
from pathlib import Path
from typing import Optional
from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse, JSONResponse
from utils.daily_helpers import create_room as _create_room, get_token, get_name_from_url
MAX_BOTS_PER_ROOM = 1
# Bot sub-process dict for status reporting and concurrency control
bot_procs = {}
def cleanup():
# Clean up function, just to be extra safe
for proc in bot_procs.values():
proc.terminate()
proc.wait()
atexit.register(cleanup)
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Mount the static directory
STATIC_DIR = "frontend/out"
app.mount("/static", StaticFiles(directory=STATIC_DIR, html=True), name="static")
@app.post("/create")
async def create_room(request: Request) -> JSONResponse:
data = await request.json()
if data.get('room_url') is not None:
room_url = data.get('room_url')
room_name = get_name_from_url(room_url)
else:
room_url, room_name = _create_room()
token = get_token(room_url)
return JSONResponse({"room_url": room_url, "room_name": room_name, "token": token})
@app.post("/start")
async def start_agent(request: Request) -> JSONResponse:
data = await request.json()
# Is this a webhook creation request?
if "test" in data:
return JSONResponse({"test": True})
# Ensure the room property is present
room_url = data.get('room_url')
if not room_url:
raise HTTPException(
status_code=500,
detail="Missing 'room' property in request data. Cannot start agent without a target room!")
# Check if there is already an existing process running in this room
num_bots_in_room = sum(
1 for proc in bot_procs.values() if proc[1] == room_url and proc[0].poll() is None)
if num_bots_in_room >= MAX_BOTS_PER_ROOM:
raise HTTPException(
status_code=500, detail=f"Max bot limited reach for room: {room_url}")
# Get the token for the room
token = get_token(room_url)
if not token:
raise HTTPException(
status_code=500, detail=f"Failed to get token for room: {room_url}")
# Spawn a new agent, and join the user session
# Note: this is mostly for demonstration purposes (refer to 'deployment' in README)
try:
proc = subprocess.Popen(
[
f"python3 -m bot -u {room_url} -t {token}"
],
shell=True,
bufsize=1,
cwd=os.path.dirname(os.path.abspath(__file__))
)
bot_procs[proc.pid] = (proc, room_url)
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to start subprocess: {e}")
return JSONResponse({"bot_id": proc.pid, "room_url": room_url})
@app.get("/status/{pid}")
def get_status(pid: int):
# Look up the subprocess
proc = bot_procs.get(pid)
# If the subprocess doesn't exist, return an error
if not proc:
raise HTTPException(
status_code=404, detail=f"Bot with process id: {pid} not found")
# Check the status of the subprocess
if proc[0].poll() is None:
status = "running"
else:
status = "finished"
return JSONResponse({"bot_id": pid, "status": status})
@app.get("/{path_name:path}", response_class=FileResponse)
async def catch_all(path_name: Optional[str] = ""):
if path_name == "":
return FileResponse(f"{STATIC_DIR}/index.html")
file_path = Path(STATIC_DIR) / (path_name or "")
if file_path.is_file():
return file_path
html_file_path = file_path.with_suffix(".html")
if html_file_path.is_file():
return FileResponse(html_file_path)
raise HTTPException(status_code=450, detail="Incorrect API call")
if __name__ == "__main__":
# Check environment variables
required_env_vars = ['OPENAI_API_KEY', 'DAILY_API_KEY',
'FAL_KEY', 'ELEVENLABS_VOICE_ID', 'ELEVENLABS_API_KEY']
for env_var in required_env_vars:
if env_var not in os.environ:
raise Exception(f"Missing environment variable: {env_var}.")
import uvicorn
default_host = os.getenv("HOST", "0.0.0.0")
default_port = int(os.getenv("FAST_API_PORT", "7860"))
parser = argparse.ArgumentParser(
description="Daily Storyteller FastAPI server")
parser.add_argument("--host", type=str,
default=default_host, help="Host address")
parser.add_argument("--port", type=int,
default=default_port, help="Port number")
parser.add_argument("--reload", action="store_true",
help="Reload code on change")
config = parser.parse_args()
uvicorn.run(
"server:app",
host=config.host,
port=config.port,
reload=config.reload
)

View File

@@ -0,0 +1,109 @@
import urllib.parse
import os
import time
import urllib
import requests
from dotenv import load_dotenv
load_dotenv()
daily_api_path = os.getenv("DAILY_API_URL") or "api.daily.co/v1"
daily_api_key = os.getenv("DAILY_API_KEY")
def create_room() -> tuple[str, str]:
"""
Helper function to create a Daily room.
# See: https://docs.daily.co/reference/rest-api/rooms
Returns:
tuple: A tuple containing the room URL and room name.
Raises:
Exception: If the request to create the room fails or if the response does not contain the room URL or room name.
"""
room_props = {
"exp": time.time() + 60 * 60, # 1 hour
"enable_chat": True,
"enable_emoji_reactions": True,
"eject_at_room_exp": True,
"enable_prejoin_ui": False, # Important for the bot to be able to join headlessly
}
res = requests.post(
f"https://{daily_api_path}/rooms",
headers={"Authorization": f"Bearer {daily_api_key}"},
json={
"properties": room_props
},
)
if res.status_code != 200:
raise Exception(f"Unable to create room: {res.text}")
data = res.json()
room_url: str = data.get("url")
room_name: str = data.get("name")
if room_url is None or room_name is None:
raise Exception("Missing room URL or room name in response")
return room_url, room_name
def get_name_from_url(room_url: str) -> str:
"""
Extracts the name from a given room URL.
Args:
room_url (str): The URL of the room.
Returns:
str: The extracted name from the room URL.
"""
return urllib.parse.urlparse(room_url).path[1:]
def get_token(room_url: str) -> str:
"""
Retrieves a meeting token for the specified Daily room URL.
# See: https://docs.daily.co/reference/rest-api/meeting-tokens
Args:
room_url (str): The URL of the Daily room.
Returns:
str: The meeting token.
Raises:
Exception: If no room URL is specified or if no Daily API key is specified.
Exception: If there is an error creating the meeting token.
"""
if not room_url:
raise Exception(
"No Daily room specified. You must specify a Daily room in order a token to be generated.")
if not daily_api_key:
raise Exception(
"No Daily API key specified. set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers.")
expiration: float = time.time() + 60 * 60
room_name = get_name_from_url(room_url)
res: requests.Response = requests.post(
f"https://{daily_api_path}/meeting-tokens",
headers={
"Authorization": f"Bearer {daily_api_key}"},
json={
"properties": {
"room_name": room_name,
"is_owner": True, # Owner tokens required for transcription
"exp": expiration}},
)
if res.status_code != 200:
raise Exception(
f"Failed to create meeting token: {res.status_code} {res.text}")
token: str = res.json()["token"]
return token

View File

@@ -1,11 +1,5 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import aiohttp
import asyncio
import aiohttp
import os
import sys
@@ -18,11 +12,7 @@ from pipecat.processors.aggregators.sentence import SentenceAggregator
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.azure import AzureTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import (
DailyParams,
DailyTranscriptionSettings,
DailyTransport,
DailyTransportMessageFrame)
from pipecat.transports.services.daily import DailyParams, DailyTranscriptionSettings, DailyTransport, DailyTransportMessageFrame
from runner import configure
@@ -89,10 +79,8 @@ class TranslationSubtitles(FrameProcessor):
await self.push_frame(frame)
async def main():
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
@@ -145,4 +133,5 @@ async def main():
if __name__ == "__main__":
asyncio.run(main())
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -1,3 +1,4 @@
python-dotenv
requests
fastapi[all]
pipecat-ai[daily,openai,azure]

View File

@@ -1,18 +1,11 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import os
import aiohttp
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper
import time
import urllib
import requests
async def configure(aiohttp_session: aiohttp.ClientSession):
def configure():
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
parser.add_argument(
"-u",
@@ -40,16 +33,26 @@ async def configure(aiohttp_session: aiohttp.ClientSession):
if not key:
raise Exception("No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers.")
daily_rest_helper = DailyRESTHelper(
daily_api_key=key,
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
aiohttp_session=aiohttp_session
)
# Create a meeting token for the given room with an expiration 1 hour in
# the future.
expiry_time: float = 60 * 60
room_name: str = urllib.parse.urlparse(url).path[1:]
expiration: float = time.time() + 60 * 60
token = await daily_rest_helper.get_token(url, expiry_time)
res: requests.Response = requests.post(
f"https://api.daily.co/v1/meeting-tokens",
headers={
"Authorization": f"Bearer {key}"},
json={
"properties": {
"room_name": room_name,
"is_owner": True,
"exp": expiration}},
)
if res.status_code != 200:
raise Exception(
f"Failed to create meeting token: {res.status_code} {res.text}")
token: str = res.json()["token"]
return (url, token)

View File

@@ -1,51 +1,31 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import aiohttp
import os
import argparse
import subprocess
from contextlib import asynccontextmanager
import atexit
from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, RedirectResponse
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomParams
from utils.daily_helpers import create_room as _create_room, get_token
MAX_BOTS_PER_ROOM = 1
# Bot sub-process dict for status reporting and concurrency control
bot_procs = {}
daily_helpers = {}
def cleanup():
# Clean up function, just to be extra safe
for entry in bot_procs.values():
proc = entry[0]
for proc in bot_procs.values():
proc.terminate()
proc.wait()
@asynccontextmanager
async def lifespan(app: FastAPI):
aiohttp_session = aiohttp.ClientSession()
daily_helpers["rest"] = DailyRESTHelper(
daily_api_key=os.getenv("DAILY_API_KEY", ""),
daily_api_url=os.getenv("DAILY_API_URL", 'https://api.daily.co/v1'),
aiohttp_session=aiohttp_session
)
yield
await aiohttp_session.close()
cleanup()
atexit.register(cleanup)
app = FastAPI(lifespan=lifespan)
app = FastAPI()
app.add_middleware(
CORSMiddleware,
@@ -59,45 +39,45 @@ app.add_middleware(
@app.get("/start")
async def start_agent(request: Request):
print(f"!!! Creating room")
room = await daily_helpers["rest"].create_room(DailyRoomParams())
print(f"!!! Room URL: {room.url}")
room_url, room_name = _create_room()
print(f"!!! Room URL: {room_url}")
# Ensure the room property is present
if not room.url:
if not room_url:
raise HTTPException(
status_code=500,
detail="Missing 'room' property in request data. Cannot start agent without a target room!")
# Check if there is already an existing process running in this room
num_bots_in_room = sum(
1 for proc in bot_procs.values() if proc[1] == room.url and proc[0].poll() is None)
1 for proc in bot_procs.values() if proc[1] == room_url and proc[0].poll() is None)
if num_bots_in_room >= MAX_BOTS_PER_ROOM:
raise HTTPException(
status_code=500, detail=f"Max bot limited reach for room: {room.url}")
status_code=500, detail=f"Max bot limited reach for room: {room_url}")
# Get the token for the room
token = await daily_helpers["rest"].get_token(room.url)
token = get_token(room_url)
if not token:
raise HTTPException(
status_code=500, detail=f"Failed to get token for room: {room.url}")
status_code=500, detail=f"Failed to get token for room: {room_url}")
# Spawn a new agent, and join the user session
# Note: this is mostly for demonstration purposes (refer to 'deployment' in README)
try:
proc = subprocess.Popen(
[
f"python3 -m bot -u {room.url} -t {token}"
f"python3 -m bot -u {room_url} -t {token}"
],
shell=True,
bufsize=1,
cwd=os.path.dirname(os.path.abspath(__file__))
)
bot_procs[proc.pid] = (proc, room.url)
bot_procs[proc.pid] = (proc, room_url)
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to start subprocess: {e}")
return RedirectResponse(room.url)
return RedirectResponse(room_url)
@app.get("/status/{pid}")

View File

@@ -0,0 +1,109 @@
import urllib.parse
import os
import time
import urllib
import requests
from dotenv import load_dotenv
load_dotenv()
daily_api_path = os.getenv("DAILY_API_URL") or "api.daily.co/v1"
daily_api_key = os.getenv("DAILY_API_KEY")
def create_room() -> tuple[str, str]:
"""
Helper function to create a Daily room.
# See: https://docs.daily.co/reference/rest-api/rooms
Returns:
tuple: A tuple containing the room URL and room name.
Raises:
Exception: If the request to create the room fails or if the response does not contain the room URL or room name.
"""
room_props = {
"exp": time.time() + 60 * 60, # 1 hour
"enable_chat": True,
"enable_emoji_reactions": True,
"eject_at_room_exp": True,
"enable_prejoin_ui": False, # Important for the bot to be able to join headlessly
}
res = requests.post(
f"https://{daily_api_path}/rooms",
headers={"Authorization": f"Bearer {daily_api_key}"},
json={
"properties": room_props
},
)
if res.status_code != 200:
raise Exception(f"Unable to create room: {res.text}")
data = res.json()
room_url: str = data.get("url")
room_name: str = data.get("name")
if room_url is None or room_name is None:
raise Exception("Missing room URL or room name in response")
return room_url, room_name
def get_name_from_url(room_url: str) -> str:
"""
Extracts the name from a given room URL.
Args:
room_url (str): The URL of the room.
Returns:
str: The extracted name from the room URL.
"""
return urllib.parse.urlparse(room_url).path[1:]
def get_token(room_url: str) -> str:
"""
Retrieves a meeting token for the specified Daily room URL.
# See: https://docs.daily.co/reference/rest-api/meeting-tokens
Args:
room_url (str): The URL of the Daily room.
Returns:
str: The meeting token.
Raises:
Exception: If no room URL is specified or if no Daily API key is specified.
Exception: If there is an error creating the meeting token.
"""
if not room_url:
raise Exception(
"No Daily room specified. You must specify a Daily room in order a token to be generated.")
if not daily_api_key:
raise Exception(
"No Daily API key specified. set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers.")
expiration: float = time.time() + 60 * 60
room_name = get_name_from_url(room_url)
res: requests.Response = requests.post(
f"https://{daily_api_path}/meeting-tokens",
headers={
"Authorization": f"Bearer {daily_api_key}"},
json={
"properties": {
"room_name": room_name,
"is_owner": True, # Owner tokens required for transcription
"exp": expiration}},
)
if res.status_code != 200:
raise Exception(
f"Failed to create meeting token: {res.status_code} {res.text}")
token: str = res.json()["token"]
return token

View File

@@ -46,16 +46,10 @@ This project is a FastAPI-based chatbot that integrates with Twilio to handle We
## Configure Twilio URLs
1. **Start ngrok**:
In a new terminal, start ngrok to tunnel the local server:
```sh
ngrok http 8765
```
2. **Update the Twilio Webhook**:
1. **Update the Twilio Webhook**:
Copy the ngrok URL and update your Twilio phone number webhook URL to `http://<ngrok_url>/start_call`.
3. **Update the streams.xml**:
2. **Update the streams.xml**:
Copy the ngrok URL and update templates/streams.xml with `wss://<ngrok_url>/ws`.
## Running the Application
@@ -67,6 +61,11 @@ This project is a FastAPI-based chatbot that integrates with Twilio to handle We
python server.py
```
2. **Start ngrok**:
In a new terminal, start ngrok to tunnel the local server:
```sh
ngrok http 8765
```
### Using Docker
1. **Build the Docker image**:

View File

@@ -15,7 +15,6 @@ from pipecat.services.deepgram import DeepgramSTTService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.transports.network.fastapi_websocket import FastAPIWebsocketTransport, FastAPIWebsocketParams
from pipecat.vad.silero import SileroVADAnalyzer
from pipecat.serializers.twilio import TwilioFrameSerializer
from loguru import logger
@@ -26,7 +25,7 @@ logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def run_bot(websocket_client, stream_sid):
async def run_bot(websocket_client):
async with aiohttp.ClientSession() as session:
transport = FastAPIWebsocketTransport(
websocket=websocket_client,
@@ -35,8 +34,7 @@ async def run_bot(websocket_client, stream_sid):
add_wav_header=False,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
serializer=TwilioFrameSerializer(stream_sid)
vad_audio_passthrough=True
)
)

View File

@@ -1,5 +1,3 @@
import json
import uvicorn
from fastapi import FastAPI, WebSocket
@@ -28,13 +26,8 @@ async def start_call():
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
start_data = websocket.iter_text()
await start_data.__anext__()
call_data = json.loads(await start_data.__anext__())
print(call_data, flush=True)
stream_sid = call_data['start']['streamSid']
print("WebSocket connection accepted")
await run_bot(websocket, stream_sid)
await run_bot(websocket)
if __name__ == "__main__":

View File

@@ -4,10 +4,11 @@
#
# pip-compile --all-extras pyproject.toml
#
aiofiles==24.1.0
aiofiles==23.2.1
# via deepgram-sdk
aiohttp==3.9.5
# via
# cartesia
# deepgram-sdk
# langchain
# langchain-community
@@ -16,7 +17,7 @@ aiosignal==1.3.1
# via aiohttp
annotated-types==0.7.0
# via pydantic
anthropic==0.28.1
anthropic==0.25.9
# via
# openpipe
# pipecat-ai (pyproject.toml)
@@ -35,19 +36,23 @@ attrs==23.2.0
# via
# aiohttp
# openpipe
av==12.3.0
av==12.1.0
# via faster-whisper
azure-cognitiveservices-speech==1.38.0
azure-cognitiveservices-speech==1.37.0
# via pipecat-ai (pyproject.toml)
blinker==1.8.2
# via flask
cachetools==5.4.0
cachetools==5.3.3
# via google-auth
certifi==2024.7.4
cartesia==0.1.1
# via pipecat-ai (pyproject.toml)
certifi==2024.6.2
# via
# httpcore
# httpx
# requests
cffi==1.16.0
# via sounddevice
charset-normalizer==3.3.2
# via requests
click==8.1.7
@@ -59,7 +64,7 @@ coloredlogs==15.0.1
# via onnxruntime
ctranslate2==4.3.1
# via faster-whisper
daily-python==0.10.1
daily-python==0.10.0
# via pipecat-ai (pyproject.toml)
dataclasses-json==0.6.7
# via
@@ -77,17 +82,19 @@ einops==0.8.0
# via pipecat-ai (pyproject.toml)
email-validator==2.2.0
# via fastapi
exceptiongroup==1.2.2
# via anyio
fal-client==0.4.1
exceptiongroup==1.2.1
# via
# anyio
# pytest
fal-client==0.4.0
# via pipecat-ai (pyproject.toml)
fastapi==0.111.1
fastapi==0.111.0
# via pipecat-ai (pyproject.toml)
fastapi-cli==0.0.5
fastapi-cli==0.0.4
# via fastapi
faster-whisper==1.0.3
faster-whisper==1.0.2
# via pipecat-ai (pyproject.toml)
filelock==3.15.4
filelock==3.15.3
# via
# huggingface-hub
# pyht
@@ -106,22 +113,22 @@ frozenlist==1.4.1
# via
# aiohttp
# aiosignal
fsspec==2024.6.1
fsspec==2024.6.0
# via
# huggingface-hub
# torch
future==1.0.0
# via pyloudnorm
google-ai-generativelanguage==0.6.6
google-ai-generativelanguage==0.6.4
# via google-generativeai
google-api-core[grpc]==2.19.1
google-api-core[grpc]==2.19.0
# via
# google-ai-generativelanguage
# google-api-python-client
# google-generativeai
google-api-python-client==2.140.0
google-api-python-client==2.134.0
# via google-generativeai
google-auth==2.33.0
google-auth==2.30.0
# via
# google-ai-generativelanguage
# google-api-core
@@ -130,20 +137,20 @@ google-auth==2.33.0
# google-generativeai
google-auth-httplib2==0.2.0
# via google-api-python-client
google-generativeai==0.7.2
google-generativeai==0.5.4
# via pipecat-ai (pyproject.toml)
googleapis-common-protos==1.63.2
googleapis-common-protos==1.63.1
# via
# google-api-core
# grpcio-status
greenlet==3.0.3
# via sqlalchemy
grpcio==1.65.4
grpcio==1.64.1
# via
# google-api-core
# grpcio-status
# pyht
grpcio-status==1.62.3
grpcio-status==1.62.2
# via google-api-core
h11==0.14.0
# via
@@ -160,6 +167,7 @@ httptools==0.6.1
httpx==0.27.0
# via
# anthropic
# cartesia
# deepgram-sdk
# fal-client
# fastapi
@@ -167,7 +175,7 @@ httpx==0.27.0
# openpipe
httpx-sse==0.4.0
# via fal-client
huggingface-hub==0.24.5
huggingface-hub==0.23.4
# via
# faster-whisper
# timm
@@ -182,6 +190,8 @@ idna==3.7
# httpx
# requests
# yarl
iniconfig==2.0.0
# via pytest
itsdangerous==2.2.0
# via flask
jinja2==3.1.4
@@ -189,35 +199,31 @@ jinja2==3.1.4
# fastapi
# flask
# torch
jiter==0.5.0
# via anthropic
jsonpatch==1.33
# via langchain-core
jsonpointer==3.0.0
# via jsonpatch
langchain==0.2.12
langchain==0.2.5
# via
# langchain-community
# pipecat-ai (pyproject.toml)
langchain-community==0.2.11
langchain-community==0.2.5
# via pipecat-ai (pyproject.toml)
langchain-core==0.2.29
langchain-core==0.2.9
# via
# langchain
# langchain-community
# langchain-openai
# langchain-text-splitters
langchain-openai==0.1.20
langchain-openai==0.1.9
# via pipecat-ai (pyproject.toml)
langchain-text-splitters==0.2.2
langchain-text-splitters==0.2.1
# via langchain
langsmith==0.1.98
langsmith==0.1.81
# via
# langchain
# langchain-community
# langchain-core
llvmlite==0.43.0
# via numba
loguru==0.7.2
# via pipecat-ai (pyproject.toml)
markdown-it-py==3.0.0
@@ -240,18 +246,14 @@ mypy-extensions==1.0.0
# via typing-inspect
networkx==3.3
# via torch
numba==0.60.0
# via resampy
numpy==1.26.4
# via
# ctranslate2
# langchain
# langchain-community
# numba
# onnxruntime
# pipecat-ai (pyproject.toml)
# pyloudnorm
# resampy
# scipy
# torchvision
# transformers
@@ -266,7 +268,7 @@ nvidia-cuda-nvrtc-cu12==12.1.105
# via torch
nvidia-cuda-runtime-cu12==12.1.105
# via torch
nvidia-cudnn-cu12==9.1.0.70
nvidia-cudnn-cu12==8.9.2.26
# via torch
nvidia-cufft-cu12==11.0.2.54
# via torch
@@ -280,41 +282,44 @@ nvidia-cusparse-cu12==12.1.0.106
# torch
nvidia-nccl-cu12==2.20.5
# via torch
nvidia-nvjitlink-cu12==12.6.20
nvidia-nvjitlink-cu12==12.5.40
# via
# nvidia-cusolver-cu12
# nvidia-cusparse-cu12
nvidia-nvtx-cu12==12.1.105
# via torch
onnxruntime==1.18.1
# via
# faster-whisper
# silero-vad
openai==1.35.15
onnxruntime==1.18.0
# via faster-whisper
openai==1.26.0
# via
# langchain-openai
# openpipe
# pipecat-ai (pyproject.toml)
openpipe==4.18.0
openpipe==4.14.0
# via pipecat-ai (pyproject.toml)
orjson==3.10.7
# via langsmith
orjson==3.10.5
# via
# fastapi
# langsmith
packaging==24.1
# via
# huggingface-hub
# langchain-core
# marshmallow
# onnxruntime
# pytest
# transformers
pillow==10.3.0
# via
# pipecat-ai (pyproject.toml)
# torchvision
pluggy==1.5.0
# via pytest
proto-plus==1.24.0
# via
# google-ai-generativelanguage
# google-api-core
protobuf==4.25.4
protobuf==4.25.3
# via
# google-ai-generativelanguage
# google-api-core
@@ -333,7 +338,9 @@ pyasn1-modules==0.4.0
# via google-auth
pyaudio==0.2.14
# via pipecat-ai (pyproject.toml)
pydantic==2.8.2
pycparser==2.22
# via cffi
pydantic==2.7.4
# via
# anthropic
# fastapi
@@ -342,7 +349,7 @@ pydantic==2.8.2
# langchain-core
# langsmith
# openai
pydantic-core==2.20.1
pydantic-core==2.18.4
# via pydantic
pygments==2.18.0
# via rich
@@ -352,6 +359,10 @@ pyloudnorm==0.1.1
# via pipecat-ai (pyproject.toml)
pyparsing==3.1.2
# via httplib2
pytest==8.2.2
# via pytest-asyncio
pytest-asyncio==0.23.7
# via cartesia
python-dateutil==2.9.0.post0
# via openpipe
python-dotenv==1.0.1
@@ -360,7 +371,7 @@ python-dotenv==1.0.1
# uvicorn
python-multipart==0.0.9
# via fastapi
pyyaml==6.0.2
pyyaml==6.0.1
# via
# ctranslate2
# huggingface-hub
@@ -370,12 +381,13 @@ pyyaml==6.0.2
# timm
# transformers
# uvicorn
regex==2024.7.24
regex==2024.5.15
# via
# tiktoken
# transformers
requests==2.32.3
# via
# cartesia
# google-api-core
# huggingface-hub
# langchain
@@ -384,22 +396,18 @@ requests==2.32.3
# pyht
# tiktoken
# transformers
resampy==0.4.3
# via pipecat-ai (pyproject.toml)
rich==13.7.1
# via typer
rsa==4.9
# via google-auth
safetensors==0.4.4
safetensors==0.4.3
# via
# timm
# transformers
scipy==1.14.0
scipy==1.13.1
# via pyloudnorm
shellingham==1.5.4
# via typer
silero-vad==5.1
# via pipecat-ai (pyproject.toml)
six==1.16.0
# via python-dateutil
sniffio==1.3.1
@@ -408,17 +416,19 @@ sniffio==1.3.1
# anyio
# httpx
# openai
sqlalchemy==2.0.32
sounddevice==0.4.7
# via pipecat-ai (pyproject.toml)
sqlalchemy==2.0.31
# via
# langchain
# langchain-community
starlette==0.37.2
# via fastapi
sympy==1.13.1
sympy==1.12.1
# via
# onnxruntime
# torch
tenacity==8.5.0
tenacity==8.4.1
# via
# langchain
# langchain-community
@@ -432,17 +442,19 @@ tokenizers==0.19.1
# anthropic
# faster-whisper
# transformers
torch==2.4.0
tomli==2.0.1
# via pytest
torch==2.3.1
# via
# silero-vad
# pipecat-ai (pyproject.toml)
# timm
# torchaudio
# torchvision
torchaudio==2.4.0
# via silero-vad
torchvision==0.19.0
torchaudio==2.3.1
# via pipecat-ai (pyproject.toml)
torchvision==0.18.1
# via timm
tqdm==4.66.5
tqdm==4.66.4
# via
# google-generativeai
# huggingface-hub
@@ -450,7 +462,7 @@ tqdm==4.66.5
# transformers
transformers==4.40.2
# via pipecat-ai (pyproject.toml)
triton==3.0.0
triton==2.3.1
# via torch
typer==0.12.3
# via fastapi-cli
@@ -462,7 +474,6 @@ typing-extensions==4.12.2
# fastapi
# google-generativeai
# huggingface-hub
# langchain-core
# openai
# pipecat-ai (pyproject.toml)
# pydantic
@@ -474,22 +485,23 @@ typing-extensions==4.12.2
# uvicorn
typing-inspect==0.9.0
# via dataclasses-json
ujson==5.10.0
# via fastapi
uritemplate==4.1.1
# via google-api-python-client
urllib3==2.2.2
# via requests
uvicorn[standard]==0.30.5
# via
# fastapi
# fastapi-cli
uvicorn[standard]==0.30.1
# via fastapi
uvloop==0.19.0
# via uvicorn
verboselogs==1.7
# via deepgram-sdk
watchfiles==0.23.0
watchfiles==0.22.0
# via uvicorn
websockets==12.0
# via
# cartesia
# deepgram-sdk
# pipecat-ai (pyproject.toml)
# uvicorn

View File

@@ -1,13 +1,14 @@
#
# This file is autogenerated by pip-compile with Python 3.10
# This file is autogenerated by pip-compile with Python 3.12
# by the following command:
#
# pip-compile --all-extras pyproject.toml
#
aiofiles==24.1.0
aiofiles==23.2.1
# via deepgram-sdk
aiohttp==3.9.5
# via
# cartesia
# deepgram-sdk
# langchain
# langchain-community
@@ -16,7 +17,7 @@ aiosignal==1.3.1
# via aiohttp
annotated-types==0.7.0
# via pydantic
anthropic==0.28.1
anthropic==0.25.9
# via
# openpipe
# pipecat-ai (pyproject.toml)
@@ -27,27 +28,27 @@ anyio==4.4.0
# openai
# starlette
# watchfiles
async-timeout==4.0.3
# via
# aiohttp
# langchain
attrs==23.2.0
# via
# aiohttp
# openpipe
av==12.3.0
av==12.1.0
# via faster-whisper
azure-cognitiveservices-speech==1.38.0
azure-cognitiveservices-speech==1.37.0
# via pipecat-ai (pyproject.toml)
blinker==1.8.2
# via flask
cachetools==5.4.0
cachetools==5.3.3
# via google-auth
certifi==2024.7.4
cartesia==0.1.1
# via pipecat-ai (pyproject.toml)
certifi==2024.6.2
# via
# httpcore
# httpx
# requests
cffi==1.16.0
# via sounddevice
charset-normalizer==3.3.2
# via requests
click==8.1.7
@@ -59,7 +60,7 @@ coloredlogs==15.0.1
# via onnxruntime
ctranslate2==4.3.1
# via faster-whisper
daily-python==0.10.1
daily-python==0.10.0
# via pipecat-ai (pyproject.toml)
dataclasses-json==0.6.7
# via
@@ -77,17 +78,15 @@ einops==0.8.0
# via pipecat-ai (pyproject.toml)
email-validator==2.2.0
# via fastapi
exceptiongroup==1.2.2
# via anyio
fal-client==0.4.1
fal-client==0.4.0
# via pipecat-ai (pyproject.toml)
fastapi==0.111.1
fastapi==0.111.0
# via pipecat-ai (pyproject.toml)
fastapi-cli==0.0.5
fastapi-cli==0.0.4
# via fastapi
faster-whisper==1.0.3
faster-whisper==1.0.2
# via pipecat-ai (pyproject.toml)
filelock==3.15.4
filelock==3.15.3
# via
# huggingface-hub
# pyht
@@ -105,22 +104,22 @@ frozenlist==1.4.1
# via
# aiohttp
# aiosignal
fsspec==2024.6.1
fsspec==2024.6.0
# via
# huggingface-hub
# torch
future==1.0.0
# via pyloudnorm
google-ai-generativelanguage==0.6.6
google-ai-generativelanguage==0.6.4
# via google-generativeai
google-api-core[grpc]==2.19.1
google-api-core[grpc]==2.19.0
# via
# google-ai-generativelanguage
# google-api-python-client
# google-generativeai
google-api-python-client==2.140.0
google-api-python-client==2.134.0
# via google-generativeai
google-auth==2.33.0
google-auth==2.30.0
# via
# google-ai-generativelanguage
# google-api-core
@@ -129,18 +128,18 @@ google-auth==2.33.0
# google-generativeai
google-auth-httplib2==0.2.0
# via google-api-python-client
google-generativeai==0.7.2
google-generativeai==0.5.4
# via pipecat-ai (pyproject.toml)
googleapis-common-protos==1.63.2
googleapis-common-protos==1.63.1
# via
# google-api-core
# grpcio-status
grpcio==1.65.4
grpcio==1.64.1
# via
# google-api-core
# grpcio-status
# pyht
grpcio-status==1.62.3
grpcio-status==1.62.2
# via google-api-core
h11==0.14.0
# via
@@ -157,6 +156,7 @@ httptools==0.6.1
httpx==0.27.0
# via
# anthropic
# cartesia
# deepgram-sdk
# fal-client
# fastapi
@@ -164,7 +164,7 @@ httpx==0.27.0
# openpipe
httpx-sse==0.4.0
# via fal-client
huggingface-hub==0.24.5
huggingface-hub==0.23.4
# via
# faster-whisper
# timm
@@ -179,6 +179,8 @@ idna==3.7
# httpx
# requests
# yarl
iniconfig==2.0.0
# via pytest
itsdangerous==2.2.0
# via flask
jinja2==3.1.4
@@ -186,35 +188,31 @@ jinja2==3.1.4
# fastapi
# flask
# torch
jiter==0.5.0
# via anthropic
jsonpatch==1.33
# via langchain-core
jsonpointer==3.0.0
# via jsonpatch
langchain==0.2.12
langchain==0.2.5
# via
# langchain-community
# pipecat-ai (pyproject.toml)
langchain-community==0.2.11
langchain-community==0.2.5
# via pipecat-ai (pyproject.toml)
langchain-core==0.2.29
langchain-core==0.2.9
# via
# langchain
# langchain-community
# langchain-openai
# langchain-text-splitters
langchain-openai==0.1.20
langchain-openai==0.1.9
# via pipecat-ai (pyproject.toml)
langchain-text-splitters==0.2.2
langchain-text-splitters==0.2.1
# via langchain
langsmith==0.1.98
langsmith==0.1.81
# via
# langchain
# langchain-community
# langchain-core
llvmlite==0.43.0
# via numba
loguru==0.7.2
# via pipecat-ai (pyproject.toml)
markdown-it-py==3.0.0
@@ -237,50 +235,49 @@ mypy-extensions==1.0.0
# via typing-inspect
networkx==3.3
# via torch
numba==0.60.0
# via resampy
numpy==1.26.4
# via
# ctranslate2
# langchain
# langchain-community
# numba
# onnxruntime
# pipecat-ai (pyproject.toml)
# pyloudnorm
# resampy
# scipy
# torchvision
# transformers
onnxruntime==1.18.1
# via
# faster-whisper
# silero-vad
openai==1.35.15
onnxruntime==1.18.0
# via faster-whisper
openai==1.26.0
# via
# langchain-openai
# openpipe
# pipecat-ai (pyproject.toml)
openpipe==4.18.0
openpipe==4.14.0
# via pipecat-ai (pyproject.toml)
orjson==3.10.7
# via langsmith
orjson==3.10.5
# via
# fastapi
# langsmith
packaging==24.1
# via
# huggingface-hub
# langchain-core
# marshmallow
# onnxruntime
# pytest
# transformers
pillow==10.3.0
# via
# pipecat-ai (pyproject.toml)
# torchvision
pluggy==1.5.0
# via pytest
proto-plus==1.24.0
# via
# google-ai-generativelanguage
# google-api-core
protobuf==4.25.4
protobuf==4.25.3
# via
# google-ai-generativelanguage
# google-api-core
@@ -299,7 +296,9 @@ pyasn1-modules==0.4.0
# via google-auth
pyaudio==0.2.14
# via pipecat-ai (pyproject.toml)
pydantic==2.8.2
pycparser==2.22
# via cffi
pydantic==2.7.4
# via
# anthropic
# fastapi
@@ -308,7 +307,7 @@ pydantic==2.8.2
# langchain-core
# langsmith
# openai
pydantic-core==2.20.1
pydantic-core==2.18.4
# via pydantic
pygments==2.18.0
# via rich
@@ -318,6 +317,10 @@ pyloudnorm==0.1.1
# via pipecat-ai (pyproject.toml)
pyparsing==3.1.2
# via httplib2
pytest==8.2.2
# via pytest-asyncio
pytest-asyncio==0.23.7
# via cartesia
python-dateutil==2.9.0.post0
# via openpipe
python-dotenv==1.0.1
@@ -326,7 +329,7 @@ python-dotenv==1.0.1
# uvicorn
python-multipart==0.0.9
# via fastapi
pyyaml==6.0.2
pyyaml==6.0.1
# via
# ctranslate2
# huggingface-hub
@@ -336,12 +339,13 @@ pyyaml==6.0.2
# timm
# transformers
# uvicorn
regex==2024.7.24
regex==2024.5.15
# via
# tiktoken
# transformers
requests==2.32.3
# via
# cartesia
# google-api-core
# huggingface-hub
# langchain
@@ -350,22 +354,18 @@ requests==2.32.3
# pyht
# tiktoken
# transformers
resampy==0.4.3
# via pipecat-ai (pyproject.toml)
rich==13.7.1
# via typer
rsa==4.9
# via google-auth
safetensors==0.4.4
safetensors==0.4.3
# via
# timm
# transformers
scipy==1.14.0
scipy==1.13.1
# via pyloudnorm
shellingham==1.5.4
# via typer
silero-vad==5.1
# via pipecat-ai (pyproject.toml)
six==1.16.0
# via python-dateutil
sniffio==1.3.1
@@ -374,17 +374,19 @@ sniffio==1.3.1
# anyio
# httpx
# openai
sqlalchemy==2.0.32
sounddevice==0.4.7
# via pipecat-ai (pyproject.toml)
sqlalchemy==2.0.31
# via
# langchain
# langchain-community
starlette==0.37.2
# via fastapi
sympy==1.13.1
sympy==1.12.1
# via
# onnxruntime
# torch
tenacity==8.5.0
tenacity==8.4.1
# via
# langchain
# langchain-community
@@ -398,17 +400,17 @@ tokenizers==0.19.1
# anthropic
# faster-whisper
# transformers
torch==2.4.0
torch==2.3.1
# via
# silero-vad
# pipecat-ai (pyproject.toml)
# timm
# torchaudio
# torchvision
torchaudio==2.4.0
# via silero-vad
torchvision==0.19.0
torchaudio==2.3.1
# via pipecat-ai (pyproject.toml)
torchvision==0.18.1
# via timm
tqdm==4.66.5
tqdm==4.66.4
# via
# google-generativeai
# huggingface-hub
@@ -421,12 +423,10 @@ typer==0.12.3
typing-extensions==4.12.2
# via
# anthropic
# anyio
# deepgram-sdk
# fastapi
# google-generativeai
# huggingface-hub
# langchain-core
# openai
# pipecat-ai (pyproject.toml)
# pydantic
@@ -435,25 +435,25 @@ typing-extensions==4.12.2
# torch
# typer
# typing-inspect
# uvicorn
typing-inspect==0.9.0
# via dataclasses-json
ujson==5.10.0
# via fastapi
uritemplate==4.1.1
# via google-api-python-client
urllib3==2.2.2
# via requests
uvicorn[standard]==0.30.5
# via
# fastapi
# fastapi-cli
uvicorn[standard]==0.30.1
# via fastapi
uvloop==0.19.0
# via uvicorn
verboselogs==1.7
# via deepgram-sdk
watchfiles==0.23.0
watchfiles==0.22.0
# via uvicorn
websockets==12.0
# via
# cartesia
# deepgram-sdk
# pipecat-ai (pyproject.toml)
# uvicorn

View File

@@ -8,7 +8,7 @@ dynamic = ["version"]
description = "An open source framework for voice (and multimodal) assistants"
license = { text = "BSD 2-Clause License" }
readme = "README.md"
requires-python = ">=3.10"
requires-python = ">=3.7"
keywords = ["webrtc", "audio", "video", "ai"]
classifiers = [
"Development Status :: 5 - Production/Stable",
@@ -34,26 +34,24 @@ Source = "https://github.com/pipecat-ai/pipecat"
Website = "https://pipecat.ai"
[project.optional-dependencies]
anthropic = [ "anthropic~=0.28.1" ]
azure = [ "azure-cognitiveservices-speech~=1.38.0" ]
cartesia = [ "websockets~=12.0" ]
daily = [ "daily-python~=0.10.1" ]
anthropic = [ "anthropic~=0.25.7" ]
azure = [ "azure-cognitiveservices-speech~=1.37.0" ]
cartesia = [ "numpy~=1.26.0", "sounddevice", "cartesia" ]
daily = [ "daily-python~=0.10.0" ]
deepgram = [ "deepgram-sdk~=3.2.7" ]
examples = [ "python-dotenv~=1.0.0", "flask~=3.0.3", "flask_cors~=4.0.1" ]
fal = [ "fal-client~=0.4.1" ]
gladia = [ "websockets~=12.0" ]
google = [ "google-generativeai~=0.7.1" ]
fireworks = [ "openai~=1.35.0" ]
langchain = [ "langchain~=0.2.10", "langchain-community~=0.2.9", "langchain-openai~=0.1.17" ]
fal = [ "fal-client~=0.4.0" ]
google = [ "google-generativeai~=0.5.3" ]
fireworks = [ "openai~=1.26.0" ]
langchain = [ "langchain~=0.2.1", "langchain-community~=0.2.1", "langchain-openai~=0.1.8" ]
local = [ "pyaudio~=0.2.0" ]
moondream = [ "einops~=0.8.0", "timm~=0.9.16", "transformers~=4.40.2" ]
openai = [ "openai~=1.35.0" ]
openpipe = [ "openpipe~=4.18.0" ]
openai = [ "openai~=1.26.0" ]
openpipe = [ "openpipe~=4.14.0" ]
playht = [ "pyht~=0.0.28" ]
silero = [ "silero-vad~=5.1" ]
silero = [ "torch~=2.3.0", "torchaudio~=2.3.0" ]
websocket = [ "websockets~=12.0", "fastapi~=0.111.0" ]
whisper = [ "faster-whisper~=1.0.3" ]
xtts = [ "resampy~=0.4.3" ]
whisper = [ "faster-whisper~=1.0.2" ]
[tool.setuptools.packages.find]
# All the following settings are optional:
@@ -64,4 +62,3 @@ pythonpath = ["src"]
[tool.setuptools_scm]
local_scheme = "no-local-version"
fallback_version = "0.0.0-dev"

View File

@@ -101,7 +101,7 @@ class UserImageRawFrame(ImageRawFrame):
class SpriteFrame(Frame):
"""An animated sprite. Will be shown by the transport if the transport's
camera is enabled. Will play at the framerate specified in the transport's
`camera_out_framerate` constructor parameter.
`fps` constructor parameter.
"""
images: List[ImageRawFrame]
@@ -158,34 +158,6 @@ class LLMMessagesFrame(DataFrame):
messages: List[dict]
@dataclass
class LLMMessagesAppendFrame(DataFrame):
"""A frame containing a list of LLM messages that neeed to be added to the
current context.
"""
messages: List[dict]
@dataclass
class LLMMessagesUpdateFrame(DataFrame):
"""A frame containing a list of new LLM messages. These messages will
replace the current context LLM messages and should generate a new
LLMMessagesFrame.
"""
messages: List[dict]
@dataclass
class TTSSpeakFrame(DataFrame):
"""A frame that contains a text that should be spoken by the TTS in the
pipeline (if any).
"""
text: str
@dataclass
class TransportMessageFrame(DataFrame):
message: Any
@@ -212,6 +184,14 @@ class SystemFrame(Frame):
pass
@dataclass
class StartFrame(SystemFrame):
"""This is the first frame that should be pushed down a pipeline."""
allow_interruptions: bool = False
enable_metrics: bool = False
report_only_initial_ttfb: bool = False
@dataclass
class CancelFrame(SystemFrame):
"""Indicates that a pipeline needs to stop right away."""
@@ -260,24 +240,12 @@ class StopInterruptionFrame(SystemFrame):
pass
@dataclass
class BotInterruptionFrame(SystemFrame):
"""Emitted by when the bot should be interrupted. This will mainly cause the
same actions as if the user interrupted except that the
UserStartedSpeakingFrame and UserStoppedSpeakingFrame won't be generated.
"""
pass
@dataclass
class MetricsFrame(SystemFrame):
"""Emitted by processor that can compute metrics like latencies.
"""
ttfb: List[Mapping[str, Any]] | None = None
processing: List[Mapping[str, Any]] | None = None
tokens: List[Mapping[str, Any]] | None = None
characters: List[Mapping[str, Any]] | None = None
ttfb: Mapping[str, float]
#
# Control frames
@@ -289,15 +257,6 @@ class ControlFrame(Frame):
pass
@dataclass
class StartFrame(ControlFrame):
"""This is the first frame that should be pushed down a pipeline."""
allow_interruptions: bool = False
enable_metrics: bool = False
enable_usage_metrics: bool = False
report_only_initial_ttfb: bool = False
@dataclass
class EndFrame(ControlFrame):
"""Indicates that a pipeline has ended and frame processors and pipelines
@@ -312,13 +271,27 @@ class EndFrame(ControlFrame):
@dataclass
class LLMFullResponseStartFrame(ControlFrame):
"""Used to indicate the beginning of an LLM response. Following by one or
more TextFrame and a final LLMFullResponseEndFrame."""
"""Used to indicate the beginning of a full LLM response. Following
LLMResponseStartFrame, TextFrame and LLMResponseEndFrame for each sentence
until a LLMFullResponseEndFrame."""
pass
@dataclass
class LLMFullResponseEndFrame(ControlFrame):
"""Indicates the end of a full LLM response."""
pass
@dataclass
class LLMResponseStartFrame(ControlFrame):
"""Used to indicate the beginning of an LLM response. Following TextFrames
are part of the LLM response until an LLMResponseEndFrame"""
pass
@dataclass
class LLMResponseEndFrame(ControlFrame):
"""Indicates the end of an LLM response."""
pass
@@ -340,33 +313,6 @@ class UserStoppedSpeakingFrame(ControlFrame):
pass
@dataclass
class BotStartedSpeakingFrame(ControlFrame):
"""Emitted upstream by transport outputs to indicate the bot started speaking.
"""
pass
@dataclass
class BotStoppedSpeakingFrame(ControlFrame):
"""Emitted upstream by transport outputs to indicate the bot stopped speaking.
"""
pass
@dataclass
class BotSpeakingFrame(ControlFrame):
"""Emitted upstream by transport outputs while the bot is still
speaking. This can be used, for example, to detect when a user is idle. That
is, while the bot is speaking we don't want to trigger any user idle timeout
since the user might be listening.
"""
pass
@dataclass
class TTSStartedFrame(ControlFrame):
"""Used to indicate the beginning of a TTS response. Following
@@ -392,17 +338,3 @@ class UserImageRequestFrame(ControlFrame):
def __str__(self):
return f"{self.name}, user: {self.user_id}"
@dataclass
class LLMModelUpdateFrame(ControlFrame):
"""A control frame containing a request to update to a new LLM model.
"""
model: str
@dataclass
class TTSVoiceUpdateFrame(ControlFrame):
"""A control frame containing a request to update to a new TTS voice.
"""
voice: str

View File

@@ -64,7 +64,7 @@ class Pipeline(BasePipeline):
services = []
for p in self._processors:
if isinstance(p, BasePipeline):
services.extend(p.processors_with_metrics())
services += p.processors_with_metrics()
elif p.can_generate_metrics():
services.append(p)
return services
@@ -91,7 +91,5 @@ class Pipeline(BasePipeline):
def _link_processors(self):
prev = self._processors[0]
for curr in self._processors[1:]:
prev.set_parent(self)
prev.link(curr)
prev = curr
prev.set_parent(self)

Some files were not shown because too many files have changed in this diff Show More