Compare commits

...

46 Commits

Author SHA1 Message Date
Ubuntu
e4388ad9fc demo fixups 2024-07-04 21:45:57 +00:00
Ubuntu
153c10781d khk fast bot back to classic pipeline 2024-07-01 22:08:49 +00:00
Ubuntu
c7a188cdf8 Merge branch 'khk/vad-gated-demo' of github.com:pipecat-ai/pipecat into khk/vad-gated-demo 2024-07-01 22:07:15 +00:00
Kwindla Hultman Kramer
ca30184237 testing vad-gated-pipeline 2024-07-01 11:49:31 +00:00
Kwindla Hultman Kramer
ae466b07f2 shorter introduction from the bot 2024-07-01 11:49:31 +00:00
Jon Taylor
a5adbb5124 removed space from event handler 2024-07-01 11:49:31 +00:00
Jon Taylor
5bbbc1f849 added pause to start and new intro prompt 2024-07-01 11:49:31 +00:00
Jon Taylor
949e90bc63 removed header comment from bot runner 2024-07-01 11:49:31 +00:00
Jon Taylor
99f8693db9 added VAD stop seconds env 2024-07-01 11:49:31 +00:00
Jon Taylor
675c041e7b prompt tweaks 2024-07-01 11:49:31 +00:00
Jon Taylor
b9617a3fde added fastbot example 2024-07-01 11:49:31 +00:00
Aleix Conchillo Flaqué
8dff460307 Merge pull request #266 from pipecat-ai/aleix/silero-num-frames-fixes
vad: fix Silero VAD required number of frames
2024-06-28 11:25:55 -07:00
Aleix Conchillo Flaqué
cce1ddb183 vad: fix Silero VAD required number of frames 2024-06-28 10:45:48 -07:00
Kwindla Hultman Kramer
9f2741e21c testing vad-gated-pipeline 2024-06-26 16:51:28 -07:00
Kwindla Hultman Kramer
a56def9585 shorter introduction from the bot 2024-06-26 16:22:31 -07:00
Jon Taylor
a4c02b412f removed space from event handler 2024-06-26 16:22:31 -07:00
Jon Taylor
3cf49e5306 added pause to start and new intro prompt 2024-06-26 16:22:31 -07:00
Jon Taylor
b53f8886f1 removed header comment from bot runner 2024-06-26 16:22:31 -07:00
Jon Taylor
ece76d36a3 added VAD stop seconds env 2024-06-26 16:22:31 -07:00
Jon Taylor
3d43683b3d prompt tweaks 2024-06-26 16:22:31 -07:00
Jon Taylor
611790bf05 added fastbot example 2024-06-26 16:22:31 -07:00
Aleix Conchillo Flaqué
8691d14289 Merge pull request #255 from Viking5274/main
Fix twilio error
2024-06-26 10:17:03 -07:00
daniil5701133
dd402da9e5 added handling streamSid after first wss connect
fixx name
2024-06-26 18:56:30 +03:00
Aleix Conchillo Flaqué
2fd04248f1 examples(storytelling-chatbot): upgrade npm vulnerabilities 2024-06-25 22:04:55 -07:00
Aleix Conchillo Flaqué
0ac42006f8 Merge pull request #260 from pipecat-ai/aleix/more-interruption-fixes
more interruption fixes
2024-06-25 21:52:02 -07:00
Aleix Conchillo Flaqué
66e331248d update CHANGELOG for 0.0.34 2024-06-25 21:43:23 -07:00
Aleix Conchillo Flaqué
4be3e8c87d aggregators: revert using intermediate results 2024-06-25 21:33:17 -07:00
Aleix Conchillo Flaqué
dac033fe61 services(azure): allow transcriptions during interruptions
If the user interrupts we can't just discard transcriptions because the user is
actually interrupting and talking.
2024-06-25 21:33:06 -07:00
Aleix Conchillo Flaqué
d302cbb114 services(deepgram): allow transcriptions during interruptions
If the user interrupts we can't just discard transcriptions because the user is
actually interrupting and talking.
2024-06-25 21:32:21 -07:00
Aleix Conchillo Flaqué
e3b407db28 Merge pull request #259 from pipecat-ai/aleix/prepare-0.0.33
update CHANGELOG for 0.0.33
2024-06-25 12:05:07 -07:00
Aleix Conchillo Flaqué
4ef623f09e update CHANGELOG for 0.0.33 2024-06-25 11:53:07 -07:00
Aleix Conchillo Flaqué
253530a63d Merge pull request #258 from pipecat-ai/aleix/upgrade-cartesia-1.0.0
services(cartesia): upgrade to new cartesia 1.0.0
2024-06-25 11:52:04 -07:00
Aleix Conchillo Flaqué
4f38d989f5 services(cartesia): upgrade to new cartesia 1.0.0 2024-06-25 11:51:34 -07:00
Aleix Conchillo Flaqué
84074e90ee Merge pull request #257 from pipecat-ai/aleix/cancel-all-tasks-when-interrutpted
cancel all tasks when interrutpted
2024-06-25 11:16:00 -07:00
Aleix Conchillo Flaqué
38aee7d8f2 services(azure): cancel tasks when interrupted and ignore incoming transcriptions 2024-06-25 11:15:26 -07:00
Aleix Conchillo Flaqué
64198313c6 services(deepgram): cancel tasks when interrupted and ignore incoming transcriptions 2024-06-25 11:15:07 -07:00
Aleix Conchillo Flaqué
d61b6c301c transports(base_input): create push tasks after pushing interruption 2024-06-25 11:15:07 -07:00
Aleix Conchillo Flaqué
83d1931266 Merge pull request #256 from pipecat-ai/aleix/tts-cleanup-when-interrupted
services(tts): strip before TTS and cleanup when interrupted
2024-06-25 11:14:32 -07:00
Aleix Conchillo Flaqué
c31f2ab285 services(tts): strip before TTS and cleanup when interrupted 2024-06-25 11:13:19 -07:00
Aleix Conchillo Flaqué
0ddc5721b4 Merge pull request #252 from pipecat-ai/aleix/daily-check-size-read-audio-frames
transports(daily): always check size of read audio frames
2024-06-25 09:45:05 -07:00
Aleix Conchillo Flaqué
98bd183bc4 pyproject: fix cartesia version and update requirements files 2024-06-25 09:43:54 -07:00
Aleix Conchillo Flaqué
aaa154524c Merge pull request #253 from pipecat-ai/aleix/llm-response-use-intermediate-results
aggregators: uses intermediate results for LLMAssistantResponseAggreg…
2024-06-24 19:21:14 -07:00
Aleix Conchillo Flaqué
beced68337 aggregators: uses intermediate results for LLMAssistantResponseAggregator 2024-06-24 17:33:45 -07:00
Aleix Conchillo Flaqué
94823ab952 transports(daily): always check size of read audio frames 2024-06-24 14:56:24 -07:00
Kwindla Hultman Kramer
0b6a19802f Merge pull request #250 from pipecat-ai/lewis/flush-tts-on-llm-response-end
Flush output from TTSService on LLMFullResponseEndFrame
2024-06-22 20:37:45 -04:00
Lewis Wolfgang
c4a2d2197c Flush output from TTSService on LLMFullResponseEndFrame
To cover cases when the LLM response does not end in punctuation.
2024-06-22 14:57:44 -04:00
27 changed files with 1183 additions and 103 deletions

View File

@@ -5,6 +5,50 @@ All notable changes to **pipecat** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [0.0.35] - 2024-06-28
### Changed
- `FastAPIWebsocketParams` now require a serializer.
- `TwilioFrameSerializer` now requires a `streamSid`.
### Fixed
- Silero VAD number of frames needs to be 512 for 16000 sample rate or 256 for
8000 sample rate.
## [0.0.34] - 2024-06-25
### Fixed
- Fixed an issue with asynchronous STT services (Deepgram and Azure) that could
interruptions to ignore transcriptions.
- Fixed an issue introduced in 0.0.33 that would cause the LLM to generate
shorter output.
## [0.0.33] - 2024-06-25
### Changed
- Upgraded to Cartesia's new Python library 1.0.0. `CartesiaTTSService` now
expects a voice ID instead of a voice name (you can get the voice ID from
Cartesia's playground). You can also specify the audio `sample_rate` and
`encoding` instead of the previous `output_format`.
### Fixed
- Fixed an issue with asynchronous STT services (Deepgram and Azure) that could
cause static audio issues and interruptions to not work properly when dealing
with multiple LLMs sentences.
- Fixed an issue that could mix new LLM responses with previous ones when
handling interruptions.
- Fixed a Daily transport blocking situation that occurred while reading audio
frames after a participant left the room. Needs daily-python >= 0.10.1.
## [0.0.32] - 2024-06-22
### Added

165
examples/fast-chatbot/.gitignore vendored Normal file
View File

@@ -0,0 +1,165 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
runpod.toml
# custom script to recursively upgrade items in requirements.py
upgrade_requirements.py
.DS_Store

View File

View File

@@ -0,0 +1,165 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
from loguru import logger
import argparse
import asyncio
import aiohttp
import os
import sys
import time
from typing import Optional
from pydantic import BaseModel, ValidationError
from pipecat.vad.vad_analyzer import VADParams
from pipecat.vad.silero import SileroVADAnalyzer
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.services.openai import OpenAILLMService
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.pipeline import Pipeline
from pipecat.frames.frames import LLMMessagesFrame, EndFrame
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator
)
from helpers import (
ClearableDeepgramTTSService,
AudioVolumeTimer,
TranscriptionTimingLogger
)
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level=os.getenv("LOG_LEVEL", "DEBUG"))
class BotSettings(BaseModel):
room_url: str
room_token: str
bot_name: str = "Pipecat"
prompt: Optional[str] = "You are a helpful assistant."
deepgram_api_key: Optional[str] = os.getenv("DEEPGRAM_API_KEY", None)
deepgram_voice: Optional[str] = os.getenv("DEEPGRAM_VOICE", "aura-asteria-en")
deepgram_tts_base_url: Optional[str] = os.getenv(
"DEEPGRAM_TTS_BASE_URL", "https://api.deepgram.com/v1/speak")
deepgram_stt_base_url: Optional[str] = os.getenv(
"DEEPGRAM_STT_BASE_URL", "https://api.deepgram.com/v1/speak")
openai_api_key: Optional[str] = os.getenv("OPENAI_API_KEY", None),
openai_model: Optional[str] = os.getenv("OPENAI_MODEL", None),
openai_base_url: Optional[str] = os.getenv("OPENAI_BASE_URL", None)
vad_stop_secs: Optional[float] = os.getenv("VAD_STOP_SECS", 0.200)
async def main(settings: BotSettings):
async with aiohttp.ClientSession() as session:
transport = DailyTransport(
settings.room_url,
settings.room_token,
settings.bot_name,
DailyParams(
audio_out_enabled=True,
transcription_enabled=False,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(
stop_secs=settings.vad_stop_secs
)),
vad_audio_passthrough=True
)
)
stt = DeepgramSTTService(
name="STT",
api_key=settings.deepgram_api_key,
url=settings.deepgram_stt_base_url
)
tts = ClearableDeepgramTTSService(
name="Voice",
aiohttp_session=session,
api_key=settings.deepgram_api_key,
voice=settings.deepgram_voice,
**({'base_url': url} if (url := settings.deepgram_tts_base_url) else {})
)
llm = OpenAILLMService(
name="Groq Llama 3 70B",
api_key=settings.openai_api_key,
model=settings.openai_model,
base_url=settings.openai_base_url,
)
messages = [
{
"role": "system",
"content": settings.prompt,
},
]
avt = AudioVolumeTimer()
tl = TranscriptionTimingLogger(avt)
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Transport user input
avt, # Audio volume timer
stt, # Speech-to-text
tl, # Transcription timing logger
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out, # Assistant spoken responses
])
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
report_only_initial_ttfb=True
))
# When the participant leaves, we exit the bot.
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
# When the first participant joins, the bot should introduce itself.
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
# Provide some air whilst tracks subscribe
time.sleep(2)
messages.append(
{
"role": "system",
"content": "Introduce yourself by saying 'hello, I'm FastBot, how can I help you today?'"})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Pipecat Bot")
parser.add_argument("-s", "--settings", type=str, required=True, help="Pipecat bot settings")
args, unknown = parser.parse_known_args()
try:
settings = BotSettings.model_validate_json(args.settings)
print(f"settings: {settings.json()}")
asyncio.run(main(settings))
except ValidationError as e:
print(e)

View File

@@ -0,0 +1,192 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
from loguru import logger
import argparse
import asyncio
import aiohttp
import os
import sys
import time
from typing import Optional
from pydantic import BaseModel, ValidationError
from pipecat.vad.vad_analyzer import VADParams
from pipecat.vad.silero import SileroVADAnalyzer
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.services.openai import OpenAILLMService, OpenAILLMContext
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.pipeline import Pipeline
from pipecat.frames.frames import LLMMessagesFrame, EndFrame
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator
)
from helpers import (
GreedyLLMAggregator,
ClearableDeepgramTTSService,
VADGate,
AudioVolumeTimer,
TranscriptionTimingLogger
)
# from helpers import (
# ClearableDeepgramTTSService,
# AudioVolumeTimer,
# TranscriptionTimingLogger
# )
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level=os.getenv("LOG_LEVEL", "DEBUG"))
class BotSettings(BaseModel):
room_url: str
room_token: str
bot_name: str = "Pipecat"
prompt: Optional[str] = "You are a helpful assistant."
deepgram_api_key: Optional[str] = os.getenv("DEEPGRAM_API_KEY", None)
deepgram_voice: Optional[str] = os.getenv("DEEPGRAM_VOICE", "aura-asteria-en")
deepgram_tts_base_url: Optional[str] = os.getenv(
"DEEPGRAM_TTS_BASE_URL", "https://api.deepgram.com/v1/speak")
deepgram_stt_base_url: Optional[str] = os.getenv(
"DEEPGRAM_STT_BASE_URL", "https://api.deepgram.com/v1/speak")
openai_api_key: Optional[str] = os.getenv("OPENAI_API_KEY", None),
openai_model: Optional[str] = os.getenv("OPENAI_MODEL", None),
openai_base_url: Optional[str] = os.getenv("OPENAI_BASE_URL", None)
vad_stop_secs: Optional[float] = os.getenv("VAD_STOP_SECS", 0.200)
async def main(settings: BotSettings):
async with aiohttp.ClientSession() as session:
transport = DailyTransport(
settings.room_url,
settings.room_token,
settings.bot_name,
DailyParams(
audio_out_enabled=True,
transcription_enabled=False,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(
stop_secs=settings.vad_stop_secs
)),
vad_audio_passthrough=True
)
)
stt = DeepgramSTTService(
name="STT",
api_key=settings.deepgram_api_key,
url=settings.deepgram_stt_base_url
)
tts = ClearableDeepgramTTSService(
name="Voice",
aiohttp_session=session,
api_key=settings.deepgram_api_key,
voice=settings.deepgram_voice,
**({'base_url': url} if (url := settings.deepgram_tts_base_url) else {})
)
llm = OpenAILLMService(
name="LLM",
api_key=settings.openai_api_key,
model=settings.openai_model,
base_url=settings.openai_base_url,
)
messages = [
{
"role": "system",
"content": settings.prompt,
},
]
# avt = AudioVolumeTimer()
# tl = TranscriptionTimingLogger(avt)
# tma_in = LLMUserResponseAggregator(messages)
# tma_out = LLMAssistantResponseAggregator(messages)
# pipeline = Pipeline([
# transport.input(), # Transport user input
# avt, # Audio volume timer
# stt, # Speech-to-text
# tl, # Transcription timing logger
# tma_in, # User responses
# llm, # LLM
# tts, # TTS
# transport.output(), # Transport bot output
# tma_out, # Assistant spoken responses
# ])
ctx = OpenAILLMContext()
greedy = GreedyLLMAggregator(name="greedy", context=ctx)
gate = VADGate(name="gate", vad_analyzer=transport.input().vad_analyzer(), context=ctx)
avt = AudioVolumeTimer()
tl = TranscriptionTimingLogger(avt)
pipeline = Pipeline([
transport.input(), # Transport user input
avt,
stt,
tl,
greedy,
llm, # LLM
tts, # TTS
gate,
transport.output(), # Transport bot output
# FrameLogger()
])
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
report_only_initial_ttfb=True
))
# When the participant leaves, we exit the bot.
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
# When the first participant joins, the bot should introduce itself.
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
# Provide some air whilst tracks subscribe
time.sleep(2)
messages.append(
{
"role": "system",
"content": "Introduce yourself by saying 'hello, I'm FastBot, how can I help you today?'"})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Pipecat Bot")
parser.add_argument("-s", "--settings", type=str, required=True, help="Pipecat bot settings")
args, unknown = parser.parse_known_args()
try:
settings = BotSettings.model_validate_json(args.settings)
# print(f"settings: {settings.json()}")
asyncio.run(main(settings))
except ValidationError as e:
print(e)

View File

@@ -0,0 +1 @@
bot-classic-pipeline.js

View File

@@ -0,0 +1,164 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
import argparse
import subprocess
from pydantic import BaseModel, ValidationError
from typing import Optional
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomObject, DailyRoomProperties, DailyRoomParams
from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from bot import BotSettings
from dotenv import load_dotenv
load_dotenv(override=True)
# ------------ Configuration ------------ #
MAX_SESSION_TIME = 5 * 60 # 5 minutes
REQUIRED_ENV_VARS = ['DAILY_API_URL', 'DAILY_API_KEY', 'DEEPGRAM_API_KEY']
daily_rest_helper = DailyRESTHelper(
os.getenv("DAILY_API_KEY", ""),
os.getenv("DAILY_API_URL", 'https://api.daily.co/v1'))
class RunnerSettings(BaseModel):
prompt: Optional[
str] = "You are a fast, low-latency chatbot. Your goal is to demonstrate voice-driven AI capabilities at human-like speeds. The technology powering you is Daily for transport, Groq for AI inference, Llama 3 (70-B version) LLM, and Deepgram for speech-to-text and text-to-speech. You are running on servers in Oregon. Respond to what the user said in a creative and helpful way, but keep responses short and legible. Ensure responses contain only words. Check again that you have not included special characters other than '?' or '!'."
deepgram_voice: Optional[str] = os.getenv("DEEPGRAM_VOICE")
openai_model: Optional[str] = os.getenv("OPENAI_MODEL", "gpt-4o")
openai_api_key: Optional[str] = os.getenv("OPENAI_API_KEY")
test: Optional[bool] = None
# ----------------- API ----------------- #
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"]
)
# ----------------- Main ----------------- #
@app.post("/start_bot")
async def start_bot(request: Request) -> JSONResponse:
runner_settings = RunnerSettings()
try:
request_body = await request.body()
if len(request_body) > 0:
runner_settings = RunnerSettings.model_validate_json(request_body)
except ValidationError as e:
raise HTTPException(
status_code=400,
detail=f"Invalid request: {e}")
except Exception as e:
# If no data in request, pass
pass
# Is this a webhook creation request?
if runner_settings.test is not None:
return JSONResponse({"test": True})
# Use specified room URL, or create a new one if not specified
room_url = os.getenv("DAILY_SAMPLE_ROOM_URL", "")
if not room_url:
params = DailyRoomParams(
properties=DailyRoomProperties()
)
try:
room: DailyRoomObject = daily_rest_helper.create_room(params=params)
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Unable to provision room {e}")
else:
# Check passed room URL exists, we should assume that it already has a sip set up
try:
room: DailyRoomObject = daily_rest_helper.get_room_from_url(room_url)
except Exception:
raise HTTPException(
status_code=500, detail=f"Room not found: {room_url}")
# Give the agent a token to join the session
token = daily_rest_helper.get_token(room.url, MAX_SESSION_TIME)
if not room or not token:
raise HTTPException(
status_code=500, detail=f"Failed to get token for room: {room_url}")
# Spawn a new agent, and join the user session
try:
bot_settings = BotSettings(
room_url=room.url,
room_token=token,
prompt=runner_settings.prompt,
deepgram_voice=runner_settings.deepgram_voice,
openai_model=runner_settings.openai_model,
openai_api_key=runner_settings.openai_api_key,
)
bot_settings_str = bot_settings.model_dump_json(exclude_none=True)
subprocess.Popen(
[f"python3 -m bot -s '{bot_settings_str}'"],
shell=True,
bufsize=1,
cwd=os.path.dirname(os.path.abspath(__file__)))
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to start subprocess: {e}")
# Grab a token for the user to join with
user_token = daily_rest_helper.get_token(room.url, MAX_SESSION_TIME)
return JSONResponse({
"room_url": room.url,
"token": user_token,
})
if __name__ == "__main__":
# Check environment variables
for env_var in REQUIRED_ENV_VARS:
if env_var not in os.environ:
raise Exception(f"Missing environment variable: {env_var}.")
parser = argparse.ArgumentParser(description="Pipecat Bot Runner")
parser.add_argument("--host", type=str,
default=os.getenv("HOST", "0.0.0.0"), help="Host address")
parser.add_argument("--port", type=int,
default=os.getenv("PORT", 7860), help="Port number")
parser.add_argument("--reload", action="store_true",
default=True, help="Reload code on change")
config = parser.parse_args()
try:
import uvicorn
uvicorn.run(
"bot_runner:app",
host=config.host,
port=config.port,
reload=config.reload
)
except KeyboardInterrupt:
print("Pipecat runner shutting down...")

View File

@@ -0,0 +1,12 @@
DAILY_SAMPLE_ROOM_URL= #optional: use the same room each time, or create a new one if unset
DAILY_API_KEY=
DAILY_API_URL=
DEEPGRAM_API_KEY=
DEEPGRAM_VOICE=
DEEPGRAM_STT_URL=
DEEPGRAM_TTS_BASE_URL=
OPENAI_API_KEY=
OPENAI_MODEL=
OPENAI_BASE_URL=

View File

@@ -0,0 +1,267 @@
from loguru import logger
import asyncio
import math
import struct
import time
from dataclasses import dataclass, field
from typing import List
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.frames.frames import (
Frame,
AudioRawFrame,
InterimTranscriptionFrame,
TranscriptionFrame,
TextFrame,
StartInterruptionFrame,
LLMFullResponseStartFrame,
TTSStoppedFrame,
MetricsFrame
)
from pipecat.vad.vad_analyzer import VADAnalyzer, VADState
from pipecat.services.deepgram import DeepgramTTSService
from pipecat.services.openai import OpenAILLMContext, OpenAILLMContextFrame
class GreedyLLMAggregator(FrameProcessor):
def __init__(self, context: OpenAILLMContext = None, **kwargs):
super().__init__(**kwargs)
self.context: OpenAILLMContext = context if context else OpenAILLMContext()
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
logger.debug(f"{frame}")
try:
if isinstance(frame, InterimTranscriptionFrame):
return
if isinstance(frame, TranscriptionFrame):
# append transcribed text to last "user" frame
if self.context.messages and self.context.messages[-1]["role"] == "user":
last_frame = self.context.messages.pop()
else:
last_frame = {"role": "user", "content": ""}
last_frame["content"] += " " + frame.text
self.context.messages.append(last_frame)
oai_context_frame = OpenAILLMContextFrame(context=self.context)
logger.debug(f"pushing frame {oai_context_frame}")
await self.push_frame(oai_context_frame)
return
await self.push_frame(frame, direction)
except Exception as e:
logger.debug(f"error: {e}")
class ClearableDeepgramTTSService(DeepgramTTSService):
def __init___(self, **kwargs):
super().__init(**kwargs)
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, StartInterruptionFrame):
self._current_sentence = ""
@dataclass
class BufferedSentence:
audio_frames: List[AudioRawFrame] = field(default_factory=list)
text_frame: TextFrame = None
class VADGate(FrameProcessor):
def __init__(
self,
vad_analyzer: VADAnalyzer = None,
context: OpenAILLMContext = None,
**kwargs):
super().__init__(**kwargs)
self.vad_analyzer = vad_analyzer
self.context = context
self._audio_pusher_task = None
self._expect_text_frame_next = False
self._sentences: List[BufferedSentence] = []
# queue output from tts one sentence at a time. associate a buffer of audio frames with the content of
# each text frame.
#
# start a coroutine to service the queue and send sentences down the pipeline when possible.
# 1. do not send anything when we are not in VADState.QUIET
# 2. if we are in VADState.QUIET, send a sentence, estimate how long it will take for that sentence
# to output, sleep until it's time to send another sentence
# 3. each time we send a sentence, append it to the conversation context
# 3. when the sentence buffer becomes empty, cancel the coroutine
# 4. if we get a new LLMFullResponse, treat that as a cancellation, too
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
try:
# A TTSService will emit a series of AudioRawFrame objects, then a TTSStoppedFrame,
# then a TextFrame.
if self._expect_text_frame_next:
self._expect_text_frame_next = False
if isinstance(frame, TextFrame):
self._sentences[-1].text_frame = frame
else:
logger.debug(f"expected a text frame, but received {frame}")
await self.push_frame(frame, direction)
return
else:
if isinstance(frame, TextFrame):
logger.error(f"XXXXXXXXXXXXXXXXXXX received a text frame, wasn't expecting it.")
if isinstance(frame, AudioRawFrame):
# if our buffer is empty or has a "finished" sentence at the end,
# then we need to start buffering a new sentence
if not self._sentences or self._sentences[-1].text_frame:
self._sentences.append(BufferedSentence())
self._sentences[-1].audio_frames.append(frame)
await self.maybe_start_audio_pusher_task()
return
if isinstance(frame, TTSStoppedFrame):
self._expect_text_frame_next = True
await self.push_frame(frame, direction)
return
# There are two ways we can be interrupted. During greedy inference, a new
# LLM response can start. Or, during playout, we can get a traditional
# user interruption frame.
if (isinstance(frame, LLMFullResponseStartFrame) or
isinstance(frame, StartInterruptionFrame)):
logger.debug(f"{frame} - Handle interruption in VADGate")
self._sentences = []
if self._audio_pusher_task:
self._audio_pusher_task.cancel()
self._audio_pusher_task = None
await self.push_frame(frame, direction)
return
await self.push_frame(frame, direction)
except Exception as e:
logger.debug(f"error: {e}")
async def maybe_start_audio_pusher_task(self):
try:
if self._audio_pusher_task:
return
self._audio_pusher_task = self.get_event_loop().create_task(self.push_audio())
except Exception as e:
logger.debug(f"Exception {e}")
async def push_audio(self):
try:
while True:
if not self._sentences:
await asyncio.sleep(0.01)
continue
if self.vad_analyzer._vad_state != VADState.QUIET:
await asyncio.sleep(0.01)
continue
# we only want to push completed sentence buffers
if not self._sentences[0].text_frame:
await asyncio.sleep(0.01)
continue
s = self._sentences.pop(0)
if not s.audio_frames:
continue
sample_rate = s.audio_frames[0].sample_rate
duration = 0
logger.debug(f"Pushing {len(s.audio_frames)} audio frames")
for frame in s.audio_frames:
await self.push_frame(frame)
# assume linear16 encoding (2 bytes per sample). todo: add some more
# metadata to AudioRawFrame, maybe
duration += (len(frame.audio) / 2 / frame.num_channels) / sample_rate
await asyncio.sleep(duration - 20 / 1000)
if self.context:
logger.debug(f"Appending assistant message to context: [{s.text_frame.text}]")
self.context.messages.append(
{"role": "assistant", "content": s.text_frame.text}
)
await self.push_frame(s.text_frame)
except Exception as e:
logger.debug(f"Exception {e}")
class TranscriptionTimingLogger(FrameProcessor):
def __init__(self, avt):
super().__init__()
self.name = "Transcription"
self._avt = avt
async def process_frame(self, frame: Frame, direction: FrameDirection):
try:
await super().process_frame(frame, direction)
if isinstance(frame, TranscriptionFrame):
elapsed = time.time() - self._avt.last_transition_ts
logger.debug(f"Transcription TTF: {elapsed}")
await self.push_frame(MetricsFrame(ttfb={self.name: elapsed}))
await self.push_frame(frame, direction)
except Exception as e:
logger.debug(f"Exception {e}")
class AudioVolumeTimer(FrameProcessor):
def __init__(self):
super().__init__()
self.last_transition_ts = 0
self._prev_volume = -80
self._speech_volume_threshold = -50
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, AudioRawFrame):
volume = self.calculate_volume(frame)
# print(f"Audio volume: {volume:.2f} dB")
if (volume >= self._speech_volume_threshold and
self._prev_volume < self._speech_volume_threshold):
# logger.debug("transition above speech volume threshold")
self.last_transition_ts = time.time()
elif (volume < self._speech_volume_threshold and
self._prev_volume >= self._speech_volume_threshold):
# logger.debug("transition below non-speech volume threshold")
self.last_transition_ts = time.time()
self._prev_volume = volume
await self.push_frame(frame, direction)
def calculate_volume(self, frame: AudioRawFrame) -> float:
if frame.num_channels != 1:
raise ValueError(f"Expected 1 channel, got {frame.num_channels}")
# Unpack audio data into 16-bit integers
fmt = f"{len(frame.audio) // 2}h"
audio_samples = struct.unpack(fmt, frame.audio)
# Calculate RMS
sum_squares = sum(sample**2 for sample in audio_samples)
rms = math.sqrt(sum_squares / len(audio_samples))
# Convert RMS to decibels (dB)
# Reference: maximum value for 16-bit audio is 32767
if rms > 0:
db = 20 * math.log10(rms / 32767)
else:
db = -96 # Minimum value (almost silent)
return db

View File

@@ -0,0 +1,6 @@
pipecat-ai[daily,openai,silero,deepgram]
fastapi
uvicorn
requests
python-dotenv
loguru

View File

@@ -38,7 +38,6 @@ async def main(room_url: str, token):
"Respond bot",
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=44100,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
@@ -47,8 +46,7 @@ async def main(room_url: str, token):
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_name="British Lady",
output_format="pcm_44100"
voice_id="a0e99841-438c-4a64-b679-ae501e7d6091", # Barbershop Man
)
llm = OpenAILLMService(

View File

@@ -66,7 +66,6 @@ async def main(room_url: str, token):
"Pipecat",
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=44100,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
@@ -75,20 +74,17 @@ async def main(room_url: str, token):
news_lady = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_name="Newslady",
output_format="pcm_44100"
voice_id="bf991597-6c13-47e4-8411-91ec2de5c466", # Newslady
)
british_lady = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_name="British Lady",
output_format="pcm_44100"
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
barbershop_man = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_name="Barbershop Man",
output_format="pcm_44100"
voice_id="a0e99841-438c-4a64-b679-ae501e7d6091", # Barbershop Man
)
llm = OpenAILLMService(

View File

@@ -899,11 +899,11 @@ brace-expansion@^2.0.1:
balanced-match "^1.0.0"
braces@^3.0.2, braces@~3.0.2:
version "3.0.2"
resolved "https://registry.yarnpkg.com/braces/-/braces-3.0.2.tgz#3454e1a462ee8d599e236df336cd9ea4f8afe107"
integrity sha512-b8um+L1RzM3WDSzvhm6gIz1yfTbBt6YTlcEKAvsmqCZZFw46z626lVj9j1yEPW33H5H+lBQpZMP1k8l+78Ha0A==
version "3.0.3"
resolved "https://registry.yarnpkg.com/braces/-/braces-3.0.3.tgz#490332f40919452272d55a8480adc0c441358789"
integrity "sha1-SQMy9AkZRSJy1VqEgK3AxEE1h4k= sha512-yQbXgO/OSZVD2IsiLlro+7Hf6Q18EJrKSEsdoMzKePKXct3gvD8oLcOQdIzGupr5Fj+EDe8gO/lxc1BzfMpxvA=="
dependencies:
fill-range "^7.0.1"
fill-range "^7.1.1"
browserslist@^4.23.0:
version "4.23.0"
@@ -1551,10 +1551,10 @@ file-entry-cache@^6.0.1:
dependencies:
flat-cache "^3.0.4"
fill-range@^7.0.1:
version "7.0.1"
resolved "https://registry.yarnpkg.com/fill-range/-/fill-range-7.0.1.tgz#1919a6a7c75fe38b2c7c77e5198535da9acdda40"
integrity sha512-qOo9F+dMUmC2Lcb4BbVvnKJxTPjCm+RRpe4gDuGrzkL7mEVl/djYSu2OdQ2Pa302N4oqkSg9ir6jaLWJ2USVpQ==
fill-range@^7.1.1:
version "7.1.1"
resolved "https://registry.yarnpkg.com/fill-range/-/fill-range-7.1.1.tgz#44265d3cac07e3ea7dc247516380643754a05292"
integrity "sha1-RCZdPKwH4+p9wkdRY4BkN1SgUpI= sha512-YsGpe3WHLK8ZYi4tWDg2Jy3ebRz2rXowDxnld4bkQB00cc/1Zw9AWnC0i9ztDJitivtQvaI9KaLyKrc+hBW0yg=="
dependencies:
to-regex-range "^5.0.1"

View File

@@ -15,6 +15,7 @@ from pipecat.services.deepgram import DeepgramSTTService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.transports.network.fastapi_websocket import FastAPIWebsocketTransport, FastAPIWebsocketParams
from pipecat.vad.silero import SileroVADAnalyzer
from pipecat.serializers.twilio import TwilioFrameSerializer
from loguru import logger
@@ -25,7 +26,7 @@ logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def run_bot(websocket_client):
async def run_bot(websocket_client, stream_sid):
async with aiohttp.ClientSession() as session:
transport = FastAPIWebsocketTransport(
websocket=websocket_client,
@@ -34,7 +35,8 @@ async def run_bot(websocket_client):
add_wav_header=False,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True
vad_audio_passthrough=True,
serializer=TwilioFrameSerializer(stream_sid)
)
)

View File

@@ -1,3 +1,5 @@
import json
import uvicorn
from fastapi import FastAPI, WebSocket
@@ -26,8 +28,13 @@ async def start_call():
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
start_data = websocket.iter_text()
await start_data.__anext__()
call_data = json.loads(await start_data.__anext__())
print(call_data, flush=True)
stream_sid = call_data['start']['streamSid']
print("WebSocket connection accepted")
await run_bot(websocket)
await run_bot(websocket, stream_sid)
if __name__ == "__main__":

View File

@@ -4,7 +4,7 @@
#
# pip-compile --all-extras pyproject.toml
#
aiofiles==23.2.1
aiofiles==24.1.0
# via deepgram-sdk
aiohttp==3.9.5
# via
@@ -44,15 +44,13 @@ blinker==1.8.2
# via flask
cachetools==5.3.3
# via google-auth
cartesia==0.1.1
cartesia==1.0.0
# via pipecat-ai (pyproject.toml)
certifi==2024.6.2
# via
# httpcore
# httpx
# requests
cffi==1.16.0
# via sounddevice
charset-normalizer==3.3.2
# via requests
click==8.1.7
@@ -64,7 +62,7 @@ coloredlogs==15.0.1
# via onnxruntime
ctranslate2==4.3.1
# via faster-whisper
daily-python==0.10.0
daily-python==0.10.1
# via pipecat-ai (pyproject.toml)
dataclasses-json==0.6.7
# via
@@ -94,7 +92,7 @@ fastapi-cli==0.0.4
# via fastapi
faster-whisper==1.0.2
# via pipecat-ai (pyproject.toml)
filelock==3.15.3
filelock==3.15.4
# via
# huggingface-hub
# pyht
@@ -121,7 +119,7 @@ future==1.0.0
# via pyloudnorm
google-ai-generativelanguage==0.6.4
# via google-generativeai
google-api-core[grpc]==2.19.0
google-api-core[grpc]==2.19.1
# via
# google-ai-generativelanguage
# google-api-python-client
@@ -139,7 +137,7 @@ google-auth-httplib2==0.2.0
# via google-api-python-client
google-generativeai==0.5.4
# via pipecat-ai (pyproject.toml)
googleapis-common-protos==1.63.1
googleapis-common-protos==1.63.2
# via
# google-api-core
# grpcio-status
@@ -219,7 +217,7 @@ langchain-openai==0.1.9
# via pipecat-ai (pyproject.toml)
langchain-text-splitters==0.2.1
# via langchain
langsmith==0.1.81
langsmith==0.1.82
# via
# langchain
# langchain-community
@@ -338,8 +336,6 @@ pyasn1-modules==0.4.0
# via google-auth
pyaudio==0.2.14
# via pipecat-ai (pyproject.toml)
pycparser==2.22
# via cffi
pydantic==2.7.4
# via
# anthropic
@@ -404,7 +400,7 @@ safetensors==0.4.3
# via
# timm
# transformers
scipy==1.13.1
scipy==1.14.0
# via pyloudnorm
shellingham==1.5.4
# via typer
@@ -416,8 +412,6 @@ sniffio==1.3.1
# anyio
# httpx
# openai
sounddevice==0.4.7
# via pipecat-ai (pyproject.toml)
sqlalchemy==2.0.31
# via
# langchain
@@ -428,7 +422,7 @@ sympy==1.12.1
# via
# onnxruntime
# torch
tenacity==8.4.1
tenacity==8.4.2
# via
# langchain
# langchain-community

View File

@@ -1,10 +1,10 @@
#
# This file is autogenerated by pip-compile with Python 3.12
# This file is autogenerated by pip-compile with Python 3.10
# by the following command:
#
# pip-compile --all-extras pyproject.toml
#
aiofiles==23.2.1
aiofiles==24.1.0
# via deepgram-sdk
aiohttp==3.9.5
# via
@@ -28,6 +28,10 @@ anyio==4.4.0
# openai
# starlette
# watchfiles
async-timeout==4.0.3
# via
# aiohttp
# langchain
attrs==23.2.0
# via
# aiohttp
@@ -40,15 +44,13 @@ blinker==1.8.2
# via flask
cachetools==5.3.3
# via google-auth
cartesia==0.1.1
cartesia==1.0.0
# via pipecat-ai (pyproject.toml)
certifi==2024.6.2
# via
# httpcore
# httpx
# requests
cffi==1.16.0
# via sounddevice
charset-normalizer==3.3.2
# via requests
click==8.1.7
@@ -60,7 +62,7 @@ coloredlogs==15.0.1
# via onnxruntime
ctranslate2==4.3.1
# via faster-whisper
daily-python==0.10.0
daily-python==0.10.1
# via pipecat-ai (pyproject.toml)
dataclasses-json==0.6.7
# via
@@ -78,6 +80,10 @@ einops==0.8.0
# via pipecat-ai (pyproject.toml)
email-validator==2.2.0
# via fastapi
exceptiongroup==1.2.1
# via
# anyio
# pytest
fal-client==0.4.0
# via pipecat-ai (pyproject.toml)
fastapi==0.111.0
@@ -86,7 +92,7 @@ fastapi-cli==0.0.4
# via fastapi
faster-whisper==1.0.2
# via pipecat-ai (pyproject.toml)
filelock==3.15.3
filelock==3.15.4
# via
# huggingface-hub
# pyht
@@ -112,7 +118,7 @@ future==1.0.0
# via pyloudnorm
google-ai-generativelanguage==0.6.4
# via google-generativeai
google-api-core[grpc]==2.19.0
google-api-core[grpc]==2.19.1
# via
# google-ai-generativelanguage
# google-api-python-client
@@ -130,7 +136,7 @@ google-auth-httplib2==0.2.0
# via google-api-python-client
google-generativeai==0.5.4
# via pipecat-ai (pyproject.toml)
googleapis-common-protos==1.63.1
googleapis-common-protos==1.63.2
# via
# google-api-core
# grpcio-status
@@ -204,11 +210,11 @@ langchain-core==0.2.9
# langchain-community
# langchain-openai
# langchain-text-splitters
langchain-openai==0.1.9
langchain-openai==0.1.10
# via pipecat-ai (pyproject.toml)
langchain-text-splitters==0.2.1
# via langchain
langsmith==0.1.81
langsmith==0.1.82
# via
# langchain
# langchain-community
@@ -296,8 +302,6 @@ pyasn1-modules==0.4.0
# via google-auth
pyaudio==0.2.14
# via pipecat-ai (pyproject.toml)
pycparser==2.22
# via cffi
pydantic==2.7.4
# via
# anthropic
@@ -362,7 +366,7 @@ safetensors==0.4.3
# via
# timm
# transformers
scipy==1.13.1
scipy==1.14.0
# via pyloudnorm
shellingham==1.5.4
# via typer
@@ -374,8 +378,6 @@ sniffio==1.3.1
# anyio
# httpx
# openai
sounddevice==0.4.7
# via pipecat-ai (pyproject.toml)
sqlalchemy==2.0.31
# via
# langchain
@@ -386,7 +388,7 @@ sympy==1.12.1
# via
# onnxruntime
# torch
tenacity==8.4.1
tenacity==8.4.2
# via
# langchain
# langchain-community
@@ -400,6 +402,8 @@ tokenizers==0.19.1
# anthropic
# faster-whisper
# transformers
tomli==2.0.1
# via pytest
torch==2.3.1
# via
# pipecat-ai (pyproject.toml)
@@ -423,6 +427,7 @@ typer==0.12.3
typing-extensions==4.12.2
# via
# anthropic
# anyio
# deepgram-sdk
# fastapi
# google-generativeai
@@ -435,6 +440,7 @@ typing-extensions==4.12.2
# torch
# typer
# typing-inspect
# uvicorn
typing-inspect==0.9.0
# via dataclasses-json
ujson==5.10.0

View File

@@ -36,8 +36,8 @@ Website = "https://pipecat.ai"
[project.optional-dependencies]
anthropic = [ "anthropic~=0.25.7" ]
azure = [ "azure-cognitiveservices-speech~=1.37.0" ]
cartesia = [ "numpy~=1.26.0", "sounddevice", "cartesia" ]
daily = [ "daily-python~=0.10.0" ]
cartesia = [ "cartesia~=1.0.0" ]
daily = [ "daily-python~=0.10.1" ]
deepgram = [ "deepgram-sdk~=3.2.7" ]
examples = [ "python-dotenv~=1.0.0", "flask~=3.0.3", "flask_cors~=4.0.1" ]
fal = [ "fal-client~=0.4.0" ]

View File

@@ -17,8 +17,8 @@ class TwilioFrameSerializer(FrameSerializer):
AudioRawFrame: "audio",
}
def __init__(self):
self._sid = None
def __init__(self, stream_sid: str):
self._stream_sid = stream_sid
def serialize(self, frame: Frame) -> str | bytes | None:
if not isinstance(frame, AudioRawFrame):
@@ -30,7 +30,7 @@ class TwilioFrameSerializer(FrameSerializer):
payload = base64.b64encode(serialized_data).decode("utf-8")
answer = {
"event": "media",
"streamSid": self._sid,
"streamSid": self._stream_sid,
"media": {
"payload": payload
}
@@ -41,9 +41,6 @@ class TwilioFrameSerializer(FrameSerializer):
def deserialize(self, data: str | bytes) -> Frame | None:
message = json.loads(data)
if not self._sid:
self._sid = message["streamSid"] if "streamSid" in message else None
if message["event"] != "media":
return None
else:

View File

@@ -16,7 +16,9 @@ from pipecat.frames.frames import (
EndFrame,
ErrorFrame,
Frame,
LLMFullResponseEndFrame,
StartFrame,
StartInterruptionFrame,
TTSStartedFrame,
TTSStoppedFrame,
TextFrame,
@@ -113,13 +115,17 @@ class TTSService(AIService):
if self._current_sentence.strip().endswith(
(".", "?", "!")) and not self._current_sentence.strip().endswith(
("Mr,", "Mrs.", "Ms.", "Dr.")):
text = self._current_sentence.strip()
text = self._current_sentence
self._current_sentence = ""
if text:
await self._push_tts_frames(text)
async def _push_tts_frames(self, text: str):
text = text.strip()
if not text:
return
await self.push_frame(TTSStartedFrame())
await self.process_generator(self.run_tts(text))
await self.push_frame(TTSStoppedFrame())
@@ -132,9 +138,12 @@ class TTSService(AIService):
if isinstance(frame, TextFrame):
await self._process_text_frame(frame)
elif isinstance(frame, EndFrame):
if self._current_sentence:
await self._push_tts_frames(self._current_sentence)
elif isinstance(frame, StartInterruptionFrame):
self._current_sentence = ""
await self.push_frame(frame, direction)
elif isinstance(frame, LLMFullResponseEndFrame) or isinstance(frame, EndFrame):
self._current_sentence = ""
await self._push_tts_frames(self._current_sentence)
await self.push_frame(frame)
else:
await self.push_frame(frame, direction)

View File

@@ -12,7 +12,17 @@ import time
from PIL import Image
from typing import AsyncGenerator
from pipecat.frames.frames import AudioRawFrame, CancelFrame, EndFrame, ErrorFrame, Frame, StartFrame, SystemFrame, TranscriptionFrame, URLImageRawFrame
from pipecat.frames.frames import (
AudioRawFrame,
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
StartFrame,
StartInterruptionFrame,
SystemFrame,
TranscriptionFrame,
URLImageRawFrame)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_services import AIService, TTSService, ImageGenService
from pipecat.services.openai import BaseOpenAILLMService
@@ -34,7 +44,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
"In order to use Azure TTS, you need to `pip install pipecat-ai[azure]`. Also, set `AZURE_SPEECH_API_KEY` and `AZURE_SPEECH_REGION` environment variables.")
"In order to use Azure, you need to `pip install pipecat-ai[azure]`. Also, set `AZURE_SPEECH_API_KEY` and `AZURE_SPEECH_REGION` environment variables.")
raise Exception(f"Missing module: {e}")
@@ -123,12 +133,18 @@ class AzureSTTService(AIService):
speech_config=speech_config, audio_config=audio_config)
self._speech_recognizer.recognized.connect(self._on_handle_recognized)
# This event will be used to ignore out-of-band transcriptions while we
# are itnerrupted.
self._is_interrupted_event = asyncio.Event()
self._create_push_task()
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, SystemFrame):
if isinstance(frame, StartInterruptionFrame):
await self._handle_interruptions(frame)
elif isinstance(frame, SystemFrame):
await self.push_frame(frame, direction)
elif isinstance(frame, AudioRawFrame):
self._audio_stream.write(frame.audio)
@@ -148,6 +164,16 @@ class AzureSTTService(AIService):
self._push_frame_task.cancel()
await self._push_frame_task
async def _handle_interruptions(self, frame: Frame):
# Cancel the task. This will stop pushing frames downstream.
self._push_frame_task.cancel()
await self._push_frame_task
# Push an out-of-band frame (i.e. not using the ordered push
# frame task).
await self.push_frame(frame)
# Create a new queue and task.
self._create_push_task()
def _create_push_task(self):
self._push_queue = asyncio.Queue()
self._push_frame_task = self.get_event_loop().create_task(self._push_frame_task_handler())

View File

@@ -4,7 +4,7 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
from cartesia.tts import AsyncCartesiaTTS
from cartesia import AsyncCartesia
from typing import AsyncGenerator
@@ -20,22 +20,24 @@ class CartesiaTTSService(TTSService):
self,
*,
api_key: str,
voice_name: str,
model_id: str = "upbeat-moon",
output_format: str = "pcm_16000",
voice_id: str,
model_id: str = "sonic-english",
encoding: str = "pcm_s16le",
sample_rate: int = 16000,
**kwargs):
super().__init__(**kwargs)
self._api_key = api_key
self._voice_name = voice_name
self._model_id = model_id
self._output_format = output_format
self._output_format = {
"container": "raw",
"encoding": encoding,
"sample_rate": sample_rate,
}
try:
self._client = AsyncCartesiaTTS(api_key=self._api_key)
voices = self._client.get_voices()
voice_id = voices[self._voice_name]["id"]
self._voice = self._client.get_voice_embedding(voice_id=voice_id)
self._client = AsyncCartesia(api_key=self._api_key)
self._voice = self._client.voices.get(id=voice_id)
except Exception as e:
logger.error(f"{self} initialization error: {e}")
@@ -48,16 +50,16 @@ class CartesiaTTSService(TTSService):
try:
await self.start_ttfb_metrics()
chunk_generator = await self._client.generate(
chunk_generator = await self._client.tts.sse(
stream=True,
transcript=text,
voice=self._voice,
voice_embedding=self._voice["embedding"],
model_id=self._model_id,
output_format=self._output_format,
)
async for chunk in chunk_generator:
await self.stop_ttfb_metrics()
yield AudioRawFrame(chunk["audio"], chunk["sampling_rate"], 1)
yield AudioRawFrame(chunk["audio"], self._output_format["sample_rate"], 1)
except Exception as e:
logger.error(f"{self} exception: {e}")

View File

@@ -18,20 +18,28 @@ from pipecat.frames.frames import (
Frame,
InterimTranscriptionFrame,
StartFrame,
StartInterruptionFrame,
SystemFrame,
TranscriptionFrame)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_services import AIService, TTSService
from deepgram import (
DeepgramClient,
DeepgramClientOptions,
LiveTranscriptionEvents,
LiveOptions,
)
from loguru import logger
# See .env.example for Deepgram configuration needed
try:
from deepgram import (
DeepgramClient,
DeepgramClientOptions,
LiveTranscriptionEvents,
LiveOptions,
)
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
"In order to use Deepgram, you need to `pip install pipecat-ai[deepgram]`. Also, set `DEEPGRAM_API_KEY` environment variable.")
raise Exception(f"Missing module: {e}")
class DeepgramTTSService(TTSService):
@@ -114,7 +122,9 @@ class DeepgramSTTService(AIService):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, SystemFrame):
if isinstance(frame, StartInterruptionFrame):
await self._handle_interruptions(frame)
elif isinstance(frame, SystemFrame):
await self.push_frame(frame, direction)
elif isinstance(frame, AudioRawFrame):
await self._connection.send(frame.audio)
@@ -137,6 +147,16 @@ class DeepgramSTTService(AIService):
self._push_frame_task.cancel()
await self._push_frame_task
async def _handle_interruptions(self, frame: Frame):
# Cancel the task. This will stop pushing frames downstream.
self._push_frame_task.cancel()
await self._push_frame_task
# Push an out-of-band frame (i.e. not using the ordered push
# frame task).
await self.push_frame(frame)
# Create a new queue and task.
self._create_push_task()
def _create_push_task(self):
self._push_queue = asyncio.Queue()
self._push_frame_task = self.get_event_loop().create_task(self._push_frame_task_handler())

View File

@@ -55,7 +55,7 @@ class BaseInputTransport(FrameProcessor):
async def push_audio_frame(self, frame: AudioRawFrame):
if self._params.audio_in_enabled or self._params.vad_enabled:
self._audio_in_queue.put_nowait(frame)
await self._audio_in_queue.put(frame)
#
# Frame processor
@@ -113,10 +113,15 @@ class BaseInputTransport(FrameProcessor):
# Make sure we notify about interruptions quickly out-of-band
if isinstance(frame, UserStartedSpeakingFrame):
logger.debug("User started speaking")
# Cancel the task. This will stop pushing frames downstream.
self._push_frame_task.cancel()
await self._push_frame_task
self._create_push_task()
# Push an out-of-band frame (i.e. not using the ordered push
# frame task) to stop everything, specially at the output
# transport.
await self.push_frame(StartInterruptionFrame())
# Create a new queue and task.
self._create_push_task()
elif isinstance(frame, UserStoppedSpeakingFrame):
logger.debug("User stopped speaking")
await self.push_frame(StopInterruptionFrame())

View File

@@ -35,7 +35,7 @@ except ModuleNotFoundError as e:
class FastAPIWebsocketParams(TransportParams):
add_wav_header: bool = False
audio_frame_size: int = 6400 # 200ms
serializer: FrameSerializer = TwilioFrameSerializer()
serializer: FrameSerializer
class FastAPIWebsocketCallbacks(BaseModel):
@@ -125,7 +125,7 @@ class FastAPIWebsocketTransport(BaseTransport):
def __init__(
self,
websocket: WebSocket,
params: FastAPIWebsocketParams = FastAPIWebsocketParams(),
params: FastAPIWebsocketParams,
input_name: str | None = None,
output_name: str | None = None,
loop: asyncio.AbstractEventLoop | None = None):

View File

@@ -209,19 +209,18 @@ class DailyTransportClient(EventHandler):
async def read_next_audio_frame(self) -> AudioRawFrame | None:
sample_rate = self._params.audio_in_sample_rate
num_channels = self._params.audio_in_channels
num_frames = int(sample_rate / 100) * 2 # 20ms of audio
if self._other_participant_has_joined:
num_frames = int(sample_rate / 100) * 2 # 20ms of audio
future = self._loop.create_future()
self._speaker.read_frames(num_frames, completion=completion_callback(future))
audio = await future
future = self._loop.create_future()
self._speaker.read_frames(num_frames, completion=completion_callback(future))
audio = await future
if len(audio) > 0:
return AudioRawFrame(audio=audio, sample_rate=sample_rate, num_channels=num_channels)
else:
# If no one has ever joined the meeting `read_frames()` would block,
# instead we just wait a bit. daily-python should probably return
# silence instead.
# If we don't read any audio it could be there's no participant
# connected. daily-python will return immediately if that's the
# case, so let's sleep for a little bit (i.e. busy wait).
await asyncio.sleep(0.01)
return None

View File

@@ -36,6 +36,9 @@ class SileroVADAnalyzer(VADAnalyzer):
def __init__(self, sample_rate=16000, params: VADParams = VADParams()):
super().__init__(sample_rate=sample_rate, num_channels=1, params=params)
if sample_rate != 16000 and sample_rate != 8000:
raise Exception("Silero VAD sample rate needs to be 16000 or 8000")
logger.debug("Loading Silero VAD model...")
(self._model, utils) = torch.hub.load(
@@ -51,7 +54,7 @@ class SileroVADAnalyzer(VADAnalyzer):
#
def num_frames_required(self) -> int:
return int(self.sample_rate / 100) * 4 # 40ms
return 512 if self.sample_rate == 16000 else 256
def voice_confidence(self, buffer) -> float:
try: