Compare commits
6 Commits
hush/realt
...
jpt/fastbo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5bd5d22270 | ||
|
|
6ee7932337 | ||
|
|
c407445dd1 | ||
|
|
447f37167e | ||
|
|
354c21500e | ||
|
|
5728e25b5a |
165
examples/fast-chatbot/.gitignore
vendored
Normal file
165
examples/fast-chatbot/.gitignore
vendored
Normal file
@@ -0,0 +1,165 @@
|
|||||||
|
# Byte-compiled / optimized / DLL files
|
||||||
|
__pycache__/
|
||||||
|
*.py[cod]
|
||||||
|
*$py.class
|
||||||
|
|
||||||
|
# C extensions
|
||||||
|
*.so
|
||||||
|
|
||||||
|
# Distribution / packaging
|
||||||
|
.Python
|
||||||
|
build/
|
||||||
|
develop-eggs/
|
||||||
|
dist/
|
||||||
|
downloads/
|
||||||
|
eggs/
|
||||||
|
.eggs/
|
||||||
|
lib/
|
||||||
|
lib64/
|
||||||
|
parts/
|
||||||
|
sdist/
|
||||||
|
var/
|
||||||
|
wheels/
|
||||||
|
share/python-wheels/
|
||||||
|
*.egg-info/
|
||||||
|
.installed.cfg
|
||||||
|
*.egg
|
||||||
|
MANIFEST
|
||||||
|
|
||||||
|
# PyInstaller
|
||||||
|
# Usually these files are written by a python script from a template
|
||||||
|
# before PyInstaller builds the exe, so as to inject date/other infos into it.
|
||||||
|
*.manifest
|
||||||
|
*.spec
|
||||||
|
|
||||||
|
# Installer logs
|
||||||
|
pip-log.txt
|
||||||
|
pip-delete-this-directory.txt
|
||||||
|
|
||||||
|
# Unit test / coverage reports
|
||||||
|
htmlcov/
|
||||||
|
.tox/
|
||||||
|
.nox/
|
||||||
|
.coverage
|
||||||
|
.coverage.*
|
||||||
|
.cache
|
||||||
|
nosetests.xml
|
||||||
|
coverage.xml
|
||||||
|
*.cover
|
||||||
|
*.py,cover
|
||||||
|
.hypothesis/
|
||||||
|
.pytest_cache/
|
||||||
|
cover/
|
||||||
|
|
||||||
|
# Translations
|
||||||
|
*.mo
|
||||||
|
*.pot
|
||||||
|
|
||||||
|
# Django stuff:
|
||||||
|
*.log
|
||||||
|
local_settings.py
|
||||||
|
db.sqlite3
|
||||||
|
db.sqlite3-journal
|
||||||
|
|
||||||
|
# Flask stuff:
|
||||||
|
instance/
|
||||||
|
.webassets-cache
|
||||||
|
|
||||||
|
# Scrapy stuff:
|
||||||
|
.scrapy
|
||||||
|
|
||||||
|
# Sphinx documentation
|
||||||
|
docs/_build/
|
||||||
|
|
||||||
|
# PyBuilder
|
||||||
|
.pybuilder/
|
||||||
|
target/
|
||||||
|
|
||||||
|
# Jupyter Notebook
|
||||||
|
.ipynb_checkpoints
|
||||||
|
|
||||||
|
# IPython
|
||||||
|
profile_default/
|
||||||
|
ipython_config.py
|
||||||
|
|
||||||
|
# pyenv
|
||||||
|
# For a library or package, you might want to ignore these files since the code is
|
||||||
|
# intended to run in multiple environments; otherwise, check them in:
|
||||||
|
# .python-version
|
||||||
|
|
||||||
|
# pipenv
|
||||||
|
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
|
||||||
|
# However, in case of collaboration, if having platform-specific dependencies or dependencies
|
||||||
|
# having no cross-platform support, pipenv may install dependencies that don't work, or not
|
||||||
|
# install all needed dependencies.
|
||||||
|
#Pipfile.lock
|
||||||
|
|
||||||
|
# poetry
|
||||||
|
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
|
||||||
|
# This is especially recommended for binary packages to ensure reproducibility, and is more
|
||||||
|
# commonly ignored for libraries.
|
||||||
|
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
|
||||||
|
#poetry.lock
|
||||||
|
|
||||||
|
# pdm
|
||||||
|
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
|
||||||
|
#pdm.lock
|
||||||
|
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
|
||||||
|
# in version control.
|
||||||
|
# https://pdm.fming.dev/#use-with-ide
|
||||||
|
.pdm.toml
|
||||||
|
|
||||||
|
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
|
||||||
|
__pypackages__/
|
||||||
|
|
||||||
|
# Celery stuff
|
||||||
|
celerybeat-schedule
|
||||||
|
celerybeat.pid
|
||||||
|
|
||||||
|
# SageMath parsed files
|
||||||
|
*.sage.py
|
||||||
|
|
||||||
|
# Environments
|
||||||
|
.env
|
||||||
|
.venv
|
||||||
|
env/
|
||||||
|
venv/
|
||||||
|
ENV/
|
||||||
|
env.bak/
|
||||||
|
venv.bak/
|
||||||
|
|
||||||
|
# Spyder project settings
|
||||||
|
.spyderproject
|
||||||
|
.spyproject
|
||||||
|
|
||||||
|
# Rope project settings
|
||||||
|
.ropeproject
|
||||||
|
|
||||||
|
# mkdocs documentation
|
||||||
|
/site
|
||||||
|
|
||||||
|
# mypy
|
||||||
|
.mypy_cache/
|
||||||
|
.dmypy.json
|
||||||
|
dmypy.json
|
||||||
|
|
||||||
|
# Pyre type checker
|
||||||
|
.pyre/
|
||||||
|
|
||||||
|
# pytype static type analyzer
|
||||||
|
.pytype/
|
||||||
|
|
||||||
|
# Cython debug symbols
|
||||||
|
cython_debug/
|
||||||
|
|
||||||
|
# PyCharm
|
||||||
|
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
|
||||||
|
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
|
||||||
|
# and can be added to the global gitignore or merged into this file. For a more nuclear
|
||||||
|
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
|
||||||
|
#.idea/
|
||||||
|
runpod.toml
|
||||||
|
|
||||||
|
# custom script to recursively upgrade items in requirements.py
|
||||||
|
upgrade_requirements.py
|
||||||
|
.DS_Store
|
||||||
0
examples/fast-chatbot/README.md
Normal file
0
examples/fast-chatbot/README.md
Normal file
164
examples/fast-chatbot/bot.py
Normal file
164
examples/fast-chatbot/bot.py
Normal file
@@ -0,0 +1,164 @@
|
|||||||
|
#
|
||||||
|
# Copyright (c) 2024, Daily
|
||||||
|
#
|
||||||
|
# SPDX-License-Identifier: BSD 2-Clause License
|
||||||
|
#
|
||||||
|
|
||||||
|
from loguru import logger
|
||||||
|
import argparse
|
||||||
|
import asyncio
|
||||||
|
import aiohttp
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from pydantic import BaseModel, ValidationError
|
||||||
|
|
||||||
|
from pipecat.vad.vad_analyzer import VADParams
|
||||||
|
from pipecat.vad.silero import SileroVADAnalyzer
|
||||||
|
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||||
|
from pipecat.services.openai import OpenAILLMService
|
||||||
|
from pipecat.services.deepgram import DeepgramSTTService
|
||||||
|
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||||
|
from pipecat.pipeline.runner import PipelineRunner
|
||||||
|
from pipecat.pipeline.pipeline import Pipeline
|
||||||
|
from pipecat.frames.frames import LLMMessagesFrame, EndFrame
|
||||||
|
|
||||||
|
from pipecat.processors.aggregators.llm_response import (
|
||||||
|
LLMAssistantResponseAggregator, LLMUserResponseAggregator
|
||||||
|
)
|
||||||
|
|
||||||
|
from helpers import (
|
||||||
|
ClearableDeepgramTTSService,
|
||||||
|
AudioVolumeTimer,
|
||||||
|
TranscriptionTimingLogger
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
load_dotenv(override=True)
|
||||||
|
|
||||||
|
logger.remove(0)
|
||||||
|
logger.add(sys.stderr, level=os.getenv("LOG_LEVEL", "DEBUG"))
|
||||||
|
|
||||||
|
|
||||||
|
class BotSettings(BaseModel):
|
||||||
|
room_url: str
|
||||||
|
room_token: str
|
||||||
|
bot_name: str = "Pipecat"
|
||||||
|
prompt: Optional[str] = "You are a helpful assistant."
|
||||||
|
deepgram_api_key: Optional[str] = os.getenv("DEEPGRAM_API_KEY", None)
|
||||||
|
deepgram_voice: Optional[str] = os.getenv("DEEPGRAM_VOICE", "aura-asteria-en")
|
||||||
|
deepgram_tts_base_url: Optional[str] = os.getenv(
|
||||||
|
"DEEPGRAM_TTS_BASE_URL", "https://api.deepgram.com/v1/speak")
|
||||||
|
deepgram_stt_base_url: Optional[str] = os.getenv(
|
||||||
|
"DEEPGRAM_STT_BASE_URL", "https://api.deepgram.com/v1/speak")
|
||||||
|
openai_api_key: Optional[str] = os.getenv("OPENAI_API_KEY", None),
|
||||||
|
openai_model: Optional[str] = os.getenv("OPENAI_MODEL", None),
|
||||||
|
openai_base_url: Optional[str] = os.getenv("OPENAI_BASE_URL", None)
|
||||||
|
vad_stop_secs: Optional[float] = os.getenv("VAD_STOP_SECS", 0.200)
|
||||||
|
|
||||||
|
|
||||||
|
async def main(settings: BotSettings):
|
||||||
|
async with aiohttp.ClientSession() as session:
|
||||||
|
transport = DailyTransport(
|
||||||
|
settings.room_url,
|
||||||
|
settings.room_token,
|
||||||
|
settings.bot_name,
|
||||||
|
DailyParams(
|
||||||
|
audio_out_enabled=True,
|
||||||
|
transcription_enabled=False,
|
||||||
|
vad_enabled=True,
|
||||||
|
vad_analyzer=SileroVADAnalyzer(params=VADParams(
|
||||||
|
stop_secs=settings.vad_stop_secs
|
||||||
|
)),
|
||||||
|
vad_audio_passthrough=True
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
stt = DeepgramSTTService(
|
||||||
|
name="STT",
|
||||||
|
api_key=settings.deepgram_api_key,
|
||||||
|
url=settings.deepgram_stt_base_url
|
||||||
|
)
|
||||||
|
|
||||||
|
tts = ClearableDeepgramTTSService(
|
||||||
|
name="Voice",
|
||||||
|
aiohttp_session=session,
|
||||||
|
api_key=settings.deepgram_api_key,
|
||||||
|
voice=settings.deepgram_voice,
|
||||||
|
**({'base_url': url} if (url := settings.deepgram_tts_base_url) else {})
|
||||||
|
)
|
||||||
|
|
||||||
|
llm = OpenAILLMService(
|
||||||
|
name="LLM",
|
||||||
|
api_key=settings.openai_api_key,
|
||||||
|
model=settings.openai_model,
|
||||||
|
base_url=settings.openai_base_url,
|
||||||
|
)
|
||||||
|
|
||||||
|
messages = [
|
||||||
|
{
|
||||||
|
"role": "system",
|
||||||
|
"content": settings.prompt,
|
||||||
|
},
|
||||||
|
]
|
||||||
|
|
||||||
|
avt = AudioVolumeTimer()
|
||||||
|
tl = TranscriptionTimingLogger(avt)
|
||||||
|
|
||||||
|
tma_in = LLMUserResponseAggregator(messages)
|
||||||
|
tma_out = LLMAssistantResponseAggregator(messages)
|
||||||
|
|
||||||
|
pipeline = Pipeline([
|
||||||
|
transport.input(), # Transport user input
|
||||||
|
avt, # Audio volume timer
|
||||||
|
stt, # Speech-to-text
|
||||||
|
tl, # Transcription timing logger
|
||||||
|
tma_in, # User responses
|
||||||
|
llm, # LLM
|
||||||
|
tts, # TTS
|
||||||
|
transport.output(), # Transport bot output
|
||||||
|
tma_out, # Assistant spoken responses
|
||||||
|
])
|
||||||
|
|
||||||
|
task = PipelineTask(
|
||||||
|
pipeline,
|
||||||
|
PipelineParams(
|
||||||
|
allow_interruptions=True,
|
||||||
|
enable_metrics=True,
|
||||||
|
report_only_initial_ttfb=True
|
||||||
|
))
|
||||||
|
|
||||||
|
# When the participant leaves, we exit the bot.
|
||||||
|
@transport.event_handler("on_participant_left")
|
||||||
|
async def on_participant_left(transport, participant, reason):
|
||||||
|
await task.queue_frame(EndFrame())
|
||||||
|
|
||||||
|
# When the first participant joins, the bot should introduce itself.
|
||||||
|
@transport.event_handler("on_first_participant_joined")
|
||||||
|
async def on_first_participant_joined(transport, participant):
|
||||||
|
# Provide some air whilst tracks subscribe
|
||||||
|
time.sleep(2)
|
||||||
|
messages.append(
|
||||||
|
{
|
||||||
|
"role": "system",
|
||||||
|
"content": "Briefly introduce yourself by saying 'hello, I'm FastBot, how can I help you today?'"})
|
||||||
|
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||||
|
|
||||||
|
runner = PipelineRunner()
|
||||||
|
await runner.run(task)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
parser = argparse.ArgumentParser(description="Pipecat Bot")
|
||||||
|
parser.add_argument("-s", "--settings", type=str, required=True, help="Pipecat bot settings")
|
||||||
|
|
||||||
|
args, unknown = parser.parse_known_args()
|
||||||
|
|
||||||
|
try:
|
||||||
|
settings = BotSettings.model_validate_json(args.settings)
|
||||||
|
asyncio.run(main(settings))
|
||||||
|
except ValidationError as e:
|
||||||
|
print(e)
|
||||||
164
examples/fast-chatbot/bot_runner.py
Normal file
164
examples/fast-chatbot/bot_runner.py
Normal file
@@ -0,0 +1,164 @@
|
|||||||
|
#
|
||||||
|
# Copyright (c) 2024, Daily
|
||||||
|
#
|
||||||
|
# SPDX-License-Identifier: BSD 2-Clause License
|
||||||
|
#
|
||||||
|
|
||||||
|
import os
|
||||||
|
import argparse
|
||||||
|
import subprocess
|
||||||
|
|
||||||
|
from pydantic import BaseModel, ValidationError
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomObject, DailyRoomProperties, DailyRoomParams
|
||||||
|
|
||||||
|
from fastapi import FastAPI, Request, HTTPException
|
||||||
|
from fastapi.middleware.cors import CORSMiddleware
|
||||||
|
from fastapi.responses import JSONResponse
|
||||||
|
|
||||||
|
from bot import BotSettings
|
||||||
|
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
load_dotenv(override=True)
|
||||||
|
|
||||||
|
|
||||||
|
# ------------ Configuration ------------ #
|
||||||
|
|
||||||
|
MAX_SESSION_TIME = 5 * 60 # 5 minutes
|
||||||
|
REQUIRED_ENV_VARS = ['DAILY_API_URL', 'DAILY_API_KEY', 'DEEPGRAM_API_KEY']
|
||||||
|
|
||||||
|
daily_rest_helper = DailyRESTHelper(
|
||||||
|
os.getenv("DAILY_API_KEY", ""),
|
||||||
|
os.getenv("DAILY_API_URL", 'https://api.daily.co/v1'))
|
||||||
|
|
||||||
|
|
||||||
|
class RunnerSettings(BaseModel):
|
||||||
|
prompt: Optional[
|
||||||
|
str] = "You are a fast, low-latency chatbot. Your goal is to demonstrate voice-driven AI capabilities at human-like speeds. When introducing yourself briefly mention your goal is to showcase speed and conversational flow. The technology powering you is Daily for transport, Cerebrium for GPU hosting, Llama 3 (8-B version) LLM, and Deepgram for speech-to-text and text-to-speech. You are hosted on the east coast of the United States. Respond to what the user said in a creative and helpful way, but keep responses short and legible. Ensure responses contain only words. Check again that you have not included special characters other than '?' or '!'."
|
||||||
|
deepgram_voice: Optional[str] = os.getenv("DEEPGRAM_VOICE")
|
||||||
|
openai_model: Optional[str] = os.getenv("OPENAI_MODEL", "gpt-4o")
|
||||||
|
openai_api_key: Optional[str] = os.getenv("OPENAI_API_KEY")
|
||||||
|
test: Optional[bool] = None
|
||||||
|
|
||||||
|
# ----------------- API ----------------- #
|
||||||
|
|
||||||
|
|
||||||
|
app = FastAPI()
|
||||||
|
|
||||||
|
app.add_middleware(
|
||||||
|
CORSMiddleware,
|
||||||
|
allow_origins=["*"],
|
||||||
|
allow_credentials=True,
|
||||||
|
allow_methods=["*"],
|
||||||
|
allow_headers=["*"]
|
||||||
|
)
|
||||||
|
|
||||||
|
# ----------------- Main ----------------- #
|
||||||
|
|
||||||
|
|
||||||
|
@app.post("/start_bot")
|
||||||
|
async def start_bot(request: Request) -> JSONResponse:
|
||||||
|
runner_settings = RunnerSettings()
|
||||||
|
try:
|
||||||
|
request_body = await request.body()
|
||||||
|
if len(request_body) > 0:
|
||||||
|
runner_settings = RunnerSettings.model_validate_json(request_body)
|
||||||
|
except ValidationError as e:
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=400,
|
||||||
|
detail=f"Invalid request: {e}")
|
||||||
|
except Exception as e:
|
||||||
|
# If no data in request, pass
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Is this a webhook creation request?
|
||||||
|
if runner_settings.test is not None:
|
||||||
|
return JSONResponse({"test": True})
|
||||||
|
|
||||||
|
# Use specified room URL, or create a new one if not specified
|
||||||
|
room_url = os.getenv("DAILY_SAMPLE_ROOM_URL", "")
|
||||||
|
|
||||||
|
if not room_url:
|
||||||
|
params = DailyRoomParams(
|
||||||
|
properties=DailyRoomProperties()
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
room: DailyRoomObject = daily_rest_helper.create_room(params=params)
|
||||||
|
except Exception as e:
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=500,
|
||||||
|
detail=f"Unable to provision room {e}")
|
||||||
|
else:
|
||||||
|
# Check passed room URL exists, we should assume that it already has a sip set up
|
||||||
|
try:
|
||||||
|
room: DailyRoomObject = daily_rest_helper.get_room_from_url(room_url)
|
||||||
|
except Exception:
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=500, detail=f"Room not found: {room_url}")
|
||||||
|
|
||||||
|
# Give the agent a token to join the session
|
||||||
|
token = daily_rest_helper.get_token(room.url, MAX_SESSION_TIME)
|
||||||
|
|
||||||
|
if not room or not token:
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=500, detail=f"Failed to get token for room: {room_url}")
|
||||||
|
|
||||||
|
# Spawn a new agent, and join the user session
|
||||||
|
try:
|
||||||
|
bot_settings = BotSettings(
|
||||||
|
room_url=room.url,
|
||||||
|
room_token=token,
|
||||||
|
prompt=runner_settings.prompt,
|
||||||
|
deepgram_voice=runner_settings.deepgram_voice,
|
||||||
|
openai_model=runner_settings.openai_model,
|
||||||
|
openai_api_key=runner_settings.openai_api_key,
|
||||||
|
)
|
||||||
|
bot_settings_str = bot_settings.model_dump_json(exclude_none=True)
|
||||||
|
|
||||||
|
subprocess.Popen(
|
||||||
|
[f"python3 -m bot -s '{bot_settings_str}'"],
|
||||||
|
shell=True,
|
||||||
|
bufsize=1,
|
||||||
|
cwd=os.path.dirname(os.path.abspath(__file__)))
|
||||||
|
except Exception as e:
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=500, detail=f"Failed to start subprocess: {e}")
|
||||||
|
|
||||||
|
# Grab a token for the user to join with
|
||||||
|
user_token = daily_rest_helper.get_token(room.url, MAX_SESSION_TIME)
|
||||||
|
|
||||||
|
return JSONResponse({
|
||||||
|
"room_url": room.url,
|
||||||
|
"token": user_token,
|
||||||
|
})
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
# Check environment variables
|
||||||
|
for env_var in REQUIRED_ENV_VARS:
|
||||||
|
if env_var not in os.environ:
|
||||||
|
raise Exception(f"Missing environment variable: {env_var}.")
|
||||||
|
|
||||||
|
parser = argparse.ArgumentParser(description="Pipecat Bot Runner")
|
||||||
|
parser.add_argument("--host", type=str,
|
||||||
|
default=os.getenv("HOST", "0.0.0.0"), help="Host address")
|
||||||
|
parser.add_argument("--port", type=int,
|
||||||
|
default=os.getenv("PORT", 7860), help="Port number")
|
||||||
|
parser.add_argument("--reload", action="store_true",
|
||||||
|
default=True, help="Reload code on change")
|
||||||
|
|
||||||
|
config = parser.parse_args()
|
||||||
|
|
||||||
|
try:
|
||||||
|
import uvicorn
|
||||||
|
|
||||||
|
uvicorn.run(
|
||||||
|
"bot_runner:app",
|
||||||
|
host=config.host,
|
||||||
|
port=config.port,
|
||||||
|
reload=config.reload
|
||||||
|
)
|
||||||
|
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
print("Pipecat runner shutting down...")
|
||||||
12
examples/fast-chatbot/env.example
Normal file
12
examples/fast-chatbot/env.example
Normal file
@@ -0,0 +1,12 @@
|
|||||||
|
DAILY_SAMPLE_ROOM_URL= #optional: use the same room each time, or create a new one if unset
|
||||||
|
DAILY_API_KEY=
|
||||||
|
DAILY_API_URL=
|
||||||
|
|
||||||
|
DEEPGRAM_API_KEY=
|
||||||
|
DEEPGRAM_VOICE=
|
||||||
|
DEEPGRAM_STT_URL=
|
||||||
|
DEEPGRAM_TTS_BASE_URL=
|
||||||
|
|
||||||
|
OPENAI_API_KEY=
|
||||||
|
OPENAI_MODEL=
|
||||||
|
OPENAI_BASE_URL=
|
||||||
267
examples/fast-chatbot/helpers.py
Normal file
267
examples/fast-chatbot/helpers.py
Normal file
@@ -0,0 +1,267 @@
|
|||||||
|
from loguru import logger
|
||||||
|
import asyncio
|
||||||
|
import math
|
||||||
|
import struct
|
||||||
|
import time
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from typing import List
|
||||||
|
|
||||||
|
|
||||||
|
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||||
|
from pipecat.frames.frames import (
|
||||||
|
Frame,
|
||||||
|
AudioRawFrame,
|
||||||
|
InterimTranscriptionFrame,
|
||||||
|
TranscriptionFrame,
|
||||||
|
TextFrame,
|
||||||
|
StartInterruptionFrame,
|
||||||
|
LLMFullResponseStartFrame,
|
||||||
|
TTSStoppedFrame,
|
||||||
|
MetricsFrame
|
||||||
|
)
|
||||||
|
|
||||||
|
from pipecat.vad.vad_analyzer import VADAnalyzer, VADState
|
||||||
|
from pipecat.services.deepgram import DeepgramTTSService
|
||||||
|
from pipecat.services.openai import OpenAILLMContext, OpenAILLMContextFrame
|
||||||
|
|
||||||
|
|
||||||
|
class GreedyLLMAggregator(FrameProcessor):
|
||||||
|
def __init__(self, context: OpenAILLMContext = None, **kwargs):
|
||||||
|
super().__init__(**kwargs)
|
||||||
|
self.context: OpenAILLMContext = context if context else OpenAILLMContext()
|
||||||
|
|
||||||
|
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||||
|
await super().process_frame(frame, direction)
|
||||||
|
|
||||||
|
logger.debug(f"{frame}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
if isinstance(frame, InterimTranscriptionFrame):
|
||||||
|
return
|
||||||
|
|
||||||
|
if isinstance(frame, TranscriptionFrame):
|
||||||
|
# append transcribed text to last "user" frame
|
||||||
|
if self.context.messages and self.context.messages[-1]["role"] == "user":
|
||||||
|
last_frame = self.context.messages.pop()
|
||||||
|
else:
|
||||||
|
last_frame = {"role": "user", "content": ""}
|
||||||
|
|
||||||
|
last_frame["content"] += " " + frame.text
|
||||||
|
self.context.messages.append(last_frame)
|
||||||
|
|
||||||
|
oai_context_frame = OpenAILLMContextFrame(context=self.context)
|
||||||
|
logger.debug(f"pushing frame {oai_context_frame}")
|
||||||
|
await self.push_frame(oai_context_frame)
|
||||||
|
return
|
||||||
|
|
||||||
|
await self.push_frame(frame, direction)
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(f"error: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
class ClearableDeepgramTTSService(DeepgramTTSService):
|
||||||
|
def __init___(self, **kwargs):
|
||||||
|
super().__init(**kwargs)
|
||||||
|
|
||||||
|
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||||
|
await super().process_frame(frame, direction)
|
||||||
|
|
||||||
|
if isinstance(frame, StartInterruptionFrame):
|
||||||
|
self._current_sentence = ""
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class BufferedSentence:
|
||||||
|
audio_frames: List[AudioRawFrame] = field(default_factory=list)
|
||||||
|
text_frame: TextFrame = None
|
||||||
|
|
||||||
|
|
||||||
|
class VADGate(FrameProcessor):
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
vad_analyzer: VADAnalyzer = None,
|
||||||
|
context: OpenAILLMContext = None,
|
||||||
|
**kwargs):
|
||||||
|
super().__init__(**kwargs)
|
||||||
|
self.vad_analyzer = vad_analyzer
|
||||||
|
self.context = context
|
||||||
|
|
||||||
|
self._audio_pusher_task = None
|
||||||
|
self._expect_text_frame_next = False
|
||||||
|
self._sentences: List[BufferedSentence] = []
|
||||||
|
|
||||||
|
# queue output from tts one sentence at a time. associate a buffer of audio frames with the content of
|
||||||
|
# each text frame.
|
||||||
|
#
|
||||||
|
# start a coroutine to service the queue and send sentences down the pipeline when possible.
|
||||||
|
# 1. do not send anything when we are not in VADState.QUIET
|
||||||
|
# 2. if we are in VADState.QUIET, send a sentence, estimate how long it will take for that sentence
|
||||||
|
# to output, sleep until it's time to send another sentence
|
||||||
|
# 3. each time we send a sentence, append it to the conversation context
|
||||||
|
# 3. when the sentence buffer becomes empty, cancel the coroutine
|
||||||
|
# 4. if we get a new LLMFullResponse, treat that as a cancellation, too
|
||||||
|
|
||||||
|
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||||
|
await super().process_frame(frame, direction)
|
||||||
|
|
||||||
|
try:
|
||||||
|
|
||||||
|
# A TTSService will emit a series of AudioRawFrame objects, then a TTSStoppedFrame,
|
||||||
|
# then a TextFrame.
|
||||||
|
|
||||||
|
if self._expect_text_frame_next:
|
||||||
|
self._expect_text_frame_next = False
|
||||||
|
if isinstance(frame, TextFrame):
|
||||||
|
self._sentences[-1].text_frame = frame
|
||||||
|
else:
|
||||||
|
logger.debug(f"expected a text frame, but received {frame}")
|
||||||
|
await self.push_frame(frame, direction)
|
||||||
|
return
|
||||||
|
else:
|
||||||
|
if isinstance(frame, TextFrame):
|
||||||
|
logger.error(f"XXXXXXXXXXXXXXXXXXX received a text frame, wasn't expecting it.")
|
||||||
|
|
||||||
|
if isinstance(frame, AudioRawFrame):
|
||||||
|
# if our buffer is empty or has a "finished" sentence at the end,
|
||||||
|
# then we need to start buffering a new sentence
|
||||||
|
if not self._sentences or self._sentences[-1].text_frame:
|
||||||
|
self._sentences.append(BufferedSentence())
|
||||||
|
self._sentences[-1].audio_frames.append(frame)
|
||||||
|
await self.maybe_start_audio_pusher_task()
|
||||||
|
return
|
||||||
|
|
||||||
|
if isinstance(frame, TTSStoppedFrame):
|
||||||
|
self._expect_text_frame_next = True
|
||||||
|
await self.push_frame(frame, direction)
|
||||||
|
return
|
||||||
|
|
||||||
|
# There are two ways we can be interrupted. During greedy inference, a new
|
||||||
|
# LLM response can start. Or, during playout, we can get a traditional
|
||||||
|
# user interruption frame.
|
||||||
|
if (isinstance(frame, LLMFullResponseStartFrame) or
|
||||||
|
isinstance(frame, StartInterruptionFrame)):
|
||||||
|
logger.debug(f"{frame} - Handle interruption in VADGate")
|
||||||
|
self._sentences = []
|
||||||
|
if self._audio_pusher_task:
|
||||||
|
self._audio_pusher_task.cancel()
|
||||||
|
self._audio_pusher_task = None
|
||||||
|
await self.push_frame(frame, direction)
|
||||||
|
return
|
||||||
|
|
||||||
|
await self.push_frame(frame, direction)
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(f"error: {e}")
|
||||||
|
|
||||||
|
async def maybe_start_audio_pusher_task(self):
|
||||||
|
try:
|
||||||
|
if self._audio_pusher_task:
|
||||||
|
return
|
||||||
|
self._audio_pusher_task = self.get_event_loop().create_task(self.push_audio())
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(f"Exception {e}")
|
||||||
|
|
||||||
|
async def push_audio(self):
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
if not self._sentences:
|
||||||
|
await asyncio.sleep(0.01)
|
||||||
|
continue
|
||||||
|
|
||||||
|
if self.vad_analyzer._vad_state != VADState.QUIET:
|
||||||
|
await asyncio.sleep(0.01)
|
||||||
|
continue
|
||||||
|
|
||||||
|
# we only want to push completed sentence buffers
|
||||||
|
if not self._sentences[0].text_frame:
|
||||||
|
await asyncio.sleep(0.01)
|
||||||
|
continue
|
||||||
|
|
||||||
|
s = self._sentences.pop(0)
|
||||||
|
if not s.audio_frames:
|
||||||
|
continue
|
||||||
|
sample_rate = s.audio_frames[0].sample_rate
|
||||||
|
duration = 0
|
||||||
|
logger.debug(f"Pushing {len(s.audio_frames)} audio frames")
|
||||||
|
for frame in s.audio_frames:
|
||||||
|
await self.push_frame(frame)
|
||||||
|
# assume linear16 encoding (2 bytes per sample). todo: add some more
|
||||||
|
# metadata to AudioRawFrame, maybe
|
||||||
|
duration += (len(frame.audio) / 2 / frame.num_channels) / sample_rate
|
||||||
|
await asyncio.sleep(duration - 20 / 1000)
|
||||||
|
if self.context:
|
||||||
|
logger.debug(f"Appending assistant message to context: [{s.text_frame.text}]")
|
||||||
|
self.context.messages.append(
|
||||||
|
{"role": "assistant", "content": s.text_frame.text}
|
||||||
|
)
|
||||||
|
await self.push_frame(s.text_frame)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(f"Exception {e}")
|
||||||
|
|
||||||
|
|
||||||
|
class TranscriptionTimingLogger(FrameProcessor):
|
||||||
|
def __init__(self, avt):
|
||||||
|
super().__init__()
|
||||||
|
self.name = "Transcription"
|
||||||
|
self._avt = avt
|
||||||
|
|
||||||
|
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||||
|
try:
|
||||||
|
await super().process_frame(frame, direction)
|
||||||
|
if isinstance(frame, TranscriptionFrame):
|
||||||
|
elapsed = time.time() - self._avt.last_transition_ts
|
||||||
|
logger.debug(f"Transcription TTF: {elapsed}")
|
||||||
|
await self.push_frame(MetricsFrame(ttfb={self.name: elapsed}))
|
||||||
|
|
||||||
|
await self.push_frame(frame, direction)
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(f"Exception {e}")
|
||||||
|
|
||||||
|
|
||||||
|
class AudioVolumeTimer(FrameProcessor):
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__()
|
||||||
|
self.last_transition_ts = 0
|
||||||
|
self._prev_volume = -80
|
||||||
|
self._speech_volume_threshold = -50
|
||||||
|
|
||||||
|
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||||
|
await super().process_frame(frame, direction)
|
||||||
|
|
||||||
|
if isinstance(frame, AudioRawFrame):
|
||||||
|
volume = self.calculate_volume(frame)
|
||||||
|
# print(f"Audio volume: {volume:.2f} dB")
|
||||||
|
if (volume >= self._speech_volume_threshold and
|
||||||
|
self._prev_volume < self._speech_volume_threshold):
|
||||||
|
# logger.debug("transition above speech volume threshold")
|
||||||
|
self.last_transition_ts = time.time()
|
||||||
|
elif (volume < self._speech_volume_threshold and
|
||||||
|
self._prev_volume >= self._speech_volume_threshold):
|
||||||
|
# logger.debug("transition below non-speech volume threshold")
|
||||||
|
self.last_transition_ts = time.time()
|
||||||
|
self._prev_volume = volume
|
||||||
|
|
||||||
|
await self.push_frame(frame, direction)
|
||||||
|
|
||||||
|
def calculate_volume(self, frame: AudioRawFrame) -> float:
|
||||||
|
if frame.num_channels != 1:
|
||||||
|
raise ValueError(f"Expected 1 channel, got {frame.num_channels}")
|
||||||
|
|
||||||
|
# Unpack audio data into 16-bit integers
|
||||||
|
fmt = f"{len(frame.audio) // 2}h"
|
||||||
|
audio_samples = struct.unpack(fmt, frame.audio)
|
||||||
|
|
||||||
|
# Calculate RMS
|
||||||
|
sum_squares = sum(sample**2 for sample in audio_samples)
|
||||||
|
rms = math.sqrt(sum_squares / len(audio_samples))
|
||||||
|
|
||||||
|
# Convert RMS to decibels (dB)
|
||||||
|
# Reference: maximum value for 16-bit audio is 32767
|
||||||
|
if rms > 0:
|
||||||
|
db = 20 * math.log10(rms / 32767)
|
||||||
|
else:
|
||||||
|
db = -96 # Minimum value (almost silent)
|
||||||
|
|
||||||
|
return db
|
||||||
6
examples/fast-chatbot/requirements.txt
Normal file
6
examples/fast-chatbot/requirements.txt
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
pipecat-ai[daily,openai,silero,deepgram]
|
||||||
|
fastapi
|
||||||
|
uvicorn
|
||||||
|
requests
|
||||||
|
python-dotenv
|
||||||
|
loguru
|
||||||
Reference in New Issue
Block a user