Compare commits
29 Commits
greedy-plu
...
jpt/fastbo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5bd5d22270 | ||
|
|
6ee7932337 | ||
|
|
c407445dd1 | ||
|
|
447f37167e | ||
|
|
354c21500e | ||
|
|
5728e25b5a | ||
|
|
0b6a19802f | ||
|
|
c4a2d2197c | ||
|
|
269d06aa15 | ||
|
|
dfef1f2c54 | ||
|
|
b62beaba0b | ||
|
|
adf414e40f | ||
|
|
dc64e57f63 | ||
|
|
d3e410b2ac | ||
|
|
c544b2474b | ||
|
|
18243de358 | ||
|
|
6625895d1f | ||
|
|
f9ecce739e | ||
|
|
0075dd8386 | ||
|
|
eef1cde816 | ||
|
|
8d867c30c6 | ||
|
|
42c668b7ae | ||
|
|
b62227b4ae | ||
|
|
25ef0cb87b | ||
|
|
e195941aa5 | ||
|
|
e09eef1dd7 | ||
|
|
7c13663a4e | ||
|
|
5753869e5e | ||
|
|
ba878a19f4 |
22
CHANGELOG.md
22
CHANGELOG.md
@@ -5,14 +5,34 @@ All notable changes to **pipecat** will be documented in this file.
|
||||
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
||||
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||
|
||||
## [Unreleased]
|
||||
## [0.0.32] - 2024-06-22
|
||||
|
||||
### Added
|
||||
|
||||
- Allow specifying a `DeepgramSTTService` url which allows using on-prem
|
||||
Deepgram.
|
||||
|
||||
- Added new `FastAPIWebsocketTransport`. This is a new websocket transport that
|
||||
can be integrated with FastAPI websockets.
|
||||
|
||||
- Added new `TwilioFrameSerializer`. This is a new serializer that knows how to
|
||||
serialize and deserialize audio frames from Twilio.
|
||||
|
||||
- Added Daily transport event: `on_dialout_answered`. See
|
||||
https://reference-python.daily.co/api_reference.html#daily.EventHandler
|
||||
|
||||
- Added new `AzureSTTService`. This allows you to use Azure Speech-To-Text.
|
||||
|
||||
### Performance
|
||||
|
||||
- Convert `BaseOutputTransport` and `BaseOutputTransport` to fully use asyncio
|
||||
and remove the use of threads.
|
||||
|
||||
### Other
|
||||
|
||||
- Added `twilio-chatbot`. This is an example that shows how to integrate Twilio
|
||||
phone numbers with a Pipecat bot.
|
||||
|
||||
- Updated `07f-interruptible-azure.py` to use `AzureLLMService`,
|
||||
`AzureSTTService` and `AzureTTSService`.
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@ autopep8~=2.1.0
|
||||
build~=1.2.1
|
||||
grpcio-tools~=1.62.2
|
||||
pip-tools~=7.4.1
|
||||
pyright~=1.1.367
|
||||
pytest~=8.2.0
|
||||
setuptools~=69.5.1
|
||||
setuptools_scm~=8.1.0
|
||||
|
||||
@@ -32,14 +32,15 @@ Next, follow the steps in the README for each demo.
|
||||
|
||||
## Projects:
|
||||
|
||||
| Project | Description | Services |
|
||||
| -------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------ | ---------------------------------------------- |
|
||||
| [Simple Chatbot](simple-chatbot) | Basic voice-driven conversational bot. A good starting point for learning the flow of the framework. | Deepgram, OpenAI, Daily, Daily Prebuilt UI |
|
||||
| [Storytelling Chatbot](storytelling-chatbot) | Stitches together multiple third-party services to create a collaborative storytime experience. | Deepgram, ElevenLabs, Open AI, Fal, Daily, Custom UI |
|
||||
| [Translation Chatbot](translation-chatbot) | Listens for user speech, then translates that speech to Spanish and speaks the translation back. Demonstrates multi-participant use-cases. | Deepgram, Azure, OpenAI, Daily, Daily Prebuilt UI |
|
||||
| [Moondream Chatbot](moondream-chatbot) | Demonstrates how to add vision capabilities to GPT4. **Note: works best with a GPU** | Deepgram, OpenAI, Moondream, Daily, Daily Prebuilt UI |
|
||||
| Function-calling Chatbot (TBC) | A chatbot that can call functions in response to user input. | Deepgram, OpenAI, Fireworks, Daily, Daily Prebuilt UI |
|
||||
| [Dialin Chatbot](dialin-chatbot) | A chatbot that connects to an incoming phone call from Daily or Twilio. | Deepgram, OpenAI, ElevenLabs, Daily, Twilio |
|
||||
| Project | Description | Services |
|
||||
|----------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------|
|
||||
| [Simple Chatbot](simple-chatbot) | Basic voice-driven conversational bot. A good starting point for learning the flow of the framework. | Deepgram, ElevenLabs, OpenAI, Daily, Daily Prebuilt UI |
|
||||
| [Storytelling Chatbot](storytelling-chatbot) | Stitches together multiple third-party services to create a collaborative storytime experience. | Deepgram, ElevenLabs, OpenAI, Fal, Daily, Custom UI |
|
||||
| [Translation Chatbot](translation-chatbot) | Listens for user speech, then translates that speech to Spanish and speaks the translation back. Demonstrates multi-participant use-cases. | Deepgram, Azure, OpenAI, Daily, Daily Prebuilt UI |
|
||||
| [Moondream Chatbot](moondream-chatbot) | Demonstrates how to add vision capabilities to GPT4. **Note: works best with a GPU** | Deepgram, ElevenLabs, OpenAI, Moondream, Daily, Daily Prebuilt UI |
|
||||
| [Patient intake](patient-intake) | A chatbot that can call functions in response to user input. | Deepgram, ElevenLabs, OpenAI, Daily, Daily Prebuilt UI |
|
||||
| [Dialin Chatbot](dialin-chatbot) | A chatbot that connects to an incoming phone call from Daily or Twilio. | Deepgram, ElevenLabs, OpenAI, Daily, Twilio |
|
||||
| [Twilio Chatbot](twilio-chatbot) | A chatbot that connects to an incoming phone call from Twilio. | Deepgram, ElevenLabs, OpenAI, Daily, Twilio |
|
||||
|
||||
> [!IMPORTANT]
|
||||
> These example projects use Daily as a WebRTC transport and can be joined using their hosted Prebuilt UI.
|
||||
|
||||
165
examples/fast-chatbot/.gitignore
vendored
Normal file
165
examples/fast-chatbot/.gitignore
vendored
Normal file
@@ -0,0 +1,165 @@
|
||||
# Byte-compiled / optimized / DLL files
|
||||
__pycache__/
|
||||
*.py[cod]
|
||||
*$py.class
|
||||
|
||||
# C extensions
|
||||
*.so
|
||||
|
||||
# Distribution / packaging
|
||||
.Python
|
||||
build/
|
||||
develop-eggs/
|
||||
dist/
|
||||
downloads/
|
||||
eggs/
|
||||
.eggs/
|
||||
lib/
|
||||
lib64/
|
||||
parts/
|
||||
sdist/
|
||||
var/
|
||||
wheels/
|
||||
share/python-wheels/
|
||||
*.egg-info/
|
||||
.installed.cfg
|
||||
*.egg
|
||||
MANIFEST
|
||||
|
||||
# PyInstaller
|
||||
# Usually these files are written by a python script from a template
|
||||
# before PyInstaller builds the exe, so as to inject date/other infos into it.
|
||||
*.manifest
|
||||
*.spec
|
||||
|
||||
# Installer logs
|
||||
pip-log.txt
|
||||
pip-delete-this-directory.txt
|
||||
|
||||
# Unit test / coverage reports
|
||||
htmlcov/
|
||||
.tox/
|
||||
.nox/
|
||||
.coverage
|
||||
.coverage.*
|
||||
.cache
|
||||
nosetests.xml
|
||||
coverage.xml
|
||||
*.cover
|
||||
*.py,cover
|
||||
.hypothesis/
|
||||
.pytest_cache/
|
||||
cover/
|
||||
|
||||
# Translations
|
||||
*.mo
|
||||
*.pot
|
||||
|
||||
# Django stuff:
|
||||
*.log
|
||||
local_settings.py
|
||||
db.sqlite3
|
||||
db.sqlite3-journal
|
||||
|
||||
# Flask stuff:
|
||||
instance/
|
||||
.webassets-cache
|
||||
|
||||
# Scrapy stuff:
|
||||
.scrapy
|
||||
|
||||
# Sphinx documentation
|
||||
docs/_build/
|
||||
|
||||
# PyBuilder
|
||||
.pybuilder/
|
||||
target/
|
||||
|
||||
# Jupyter Notebook
|
||||
.ipynb_checkpoints
|
||||
|
||||
# IPython
|
||||
profile_default/
|
||||
ipython_config.py
|
||||
|
||||
# pyenv
|
||||
# For a library or package, you might want to ignore these files since the code is
|
||||
# intended to run in multiple environments; otherwise, check them in:
|
||||
# .python-version
|
||||
|
||||
# pipenv
|
||||
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
|
||||
# However, in case of collaboration, if having platform-specific dependencies or dependencies
|
||||
# having no cross-platform support, pipenv may install dependencies that don't work, or not
|
||||
# install all needed dependencies.
|
||||
#Pipfile.lock
|
||||
|
||||
# poetry
|
||||
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
|
||||
# This is especially recommended for binary packages to ensure reproducibility, and is more
|
||||
# commonly ignored for libraries.
|
||||
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
|
||||
#poetry.lock
|
||||
|
||||
# pdm
|
||||
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
|
||||
#pdm.lock
|
||||
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
|
||||
# in version control.
|
||||
# https://pdm.fming.dev/#use-with-ide
|
||||
.pdm.toml
|
||||
|
||||
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
|
||||
__pypackages__/
|
||||
|
||||
# Celery stuff
|
||||
celerybeat-schedule
|
||||
celerybeat.pid
|
||||
|
||||
# SageMath parsed files
|
||||
*.sage.py
|
||||
|
||||
# Environments
|
||||
.env
|
||||
.venv
|
||||
env/
|
||||
venv/
|
||||
ENV/
|
||||
env.bak/
|
||||
venv.bak/
|
||||
|
||||
# Spyder project settings
|
||||
.spyderproject
|
||||
.spyproject
|
||||
|
||||
# Rope project settings
|
||||
.ropeproject
|
||||
|
||||
# mkdocs documentation
|
||||
/site
|
||||
|
||||
# mypy
|
||||
.mypy_cache/
|
||||
.dmypy.json
|
||||
dmypy.json
|
||||
|
||||
# Pyre type checker
|
||||
.pyre/
|
||||
|
||||
# pytype static type analyzer
|
||||
.pytype/
|
||||
|
||||
# Cython debug symbols
|
||||
cython_debug/
|
||||
|
||||
# PyCharm
|
||||
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
|
||||
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
|
||||
# and can be added to the global gitignore or merged into this file. For a more nuclear
|
||||
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
|
||||
#.idea/
|
||||
runpod.toml
|
||||
|
||||
# custom script to recursively upgrade items in requirements.py
|
||||
upgrade_requirements.py
|
||||
.DS_Store
|
||||
0
examples/fast-chatbot/README.md
Normal file
0
examples/fast-chatbot/README.md
Normal file
164
examples/fast-chatbot/bot.py
Normal file
164
examples/fast-chatbot/bot.py
Normal file
@@ -0,0 +1,164 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
from loguru import logger
|
||||
import argparse
|
||||
import asyncio
|
||||
import aiohttp
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
from typing import Optional
|
||||
|
||||
from pydantic import BaseModel, ValidationError
|
||||
|
||||
from pipecat.vad.vad_analyzer import VADParams
|
||||
from pipecat.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
from pipecat.services.openai import OpenAILLMService
|
||||
from pipecat.services.deepgram import DeepgramSTTService
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.frames.frames import LLMMessagesFrame, EndFrame
|
||||
|
||||
from pipecat.processors.aggregators.llm_response import (
|
||||
LLMAssistantResponseAggregator, LLMUserResponseAggregator
|
||||
)
|
||||
|
||||
from helpers import (
|
||||
ClearableDeepgramTTSService,
|
||||
AudioVolumeTimer,
|
||||
TranscriptionTimingLogger
|
||||
)
|
||||
|
||||
|
||||
from dotenv import load_dotenv
|
||||
load_dotenv(override=True)
|
||||
|
||||
logger.remove(0)
|
||||
logger.add(sys.stderr, level=os.getenv("LOG_LEVEL", "DEBUG"))
|
||||
|
||||
|
||||
class BotSettings(BaseModel):
|
||||
room_url: str
|
||||
room_token: str
|
||||
bot_name: str = "Pipecat"
|
||||
prompt: Optional[str] = "You are a helpful assistant."
|
||||
deepgram_api_key: Optional[str] = os.getenv("DEEPGRAM_API_KEY", None)
|
||||
deepgram_voice: Optional[str] = os.getenv("DEEPGRAM_VOICE", "aura-asteria-en")
|
||||
deepgram_tts_base_url: Optional[str] = os.getenv(
|
||||
"DEEPGRAM_TTS_BASE_URL", "https://api.deepgram.com/v1/speak")
|
||||
deepgram_stt_base_url: Optional[str] = os.getenv(
|
||||
"DEEPGRAM_STT_BASE_URL", "https://api.deepgram.com/v1/speak")
|
||||
openai_api_key: Optional[str] = os.getenv("OPENAI_API_KEY", None),
|
||||
openai_model: Optional[str] = os.getenv("OPENAI_MODEL", None),
|
||||
openai_base_url: Optional[str] = os.getenv("OPENAI_BASE_URL", None)
|
||||
vad_stop_secs: Optional[float] = os.getenv("VAD_STOP_SECS", 0.200)
|
||||
|
||||
|
||||
async def main(settings: BotSettings):
|
||||
async with aiohttp.ClientSession() as session:
|
||||
transport = DailyTransport(
|
||||
settings.room_url,
|
||||
settings.room_token,
|
||||
settings.bot_name,
|
||||
DailyParams(
|
||||
audio_out_enabled=True,
|
||||
transcription_enabled=False,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(params=VADParams(
|
||||
stop_secs=settings.vad_stop_secs
|
||||
)),
|
||||
vad_audio_passthrough=True
|
||||
)
|
||||
)
|
||||
|
||||
stt = DeepgramSTTService(
|
||||
name="STT",
|
||||
api_key=settings.deepgram_api_key,
|
||||
url=settings.deepgram_stt_base_url
|
||||
)
|
||||
|
||||
tts = ClearableDeepgramTTSService(
|
||||
name="Voice",
|
||||
aiohttp_session=session,
|
||||
api_key=settings.deepgram_api_key,
|
||||
voice=settings.deepgram_voice,
|
||||
**({'base_url': url} if (url := settings.deepgram_tts_base_url) else {})
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(
|
||||
name="LLM",
|
||||
api_key=settings.openai_api_key,
|
||||
model=settings.openai_model,
|
||||
base_url=settings.openai_base_url,
|
||||
)
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": settings.prompt,
|
||||
},
|
||||
]
|
||||
|
||||
avt = AudioVolumeTimer()
|
||||
tl = TranscriptionTimingLogger(avt)
|
||||
|
||||
tma_in = LLMUserResponseAggregator(messages)
|
||||
tma_out = LLMAssistantResponseAggregator(messages)
|
||||
|
||||
pipeline = Pipeline([
|
||||
transport.input(), # Transport user input
|
||||
avt, # Audio volume timer
|
||||
stt, # Speech-to-text
|
||||
tl, # Transcription timing logger
|
||||
tma_in, # User responses
|
||||
llm, # LLM
|
||||
tts, # TTS
|
||||
transport.output(), # Transport bot output
|
||||
tma_out, # Assistant spoken responses
|
||||
])
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
report_only_initial_ttfb=True
|
||||
))
|
||||
|
||||
# When the participant leaves, we exit the bot.
|
||||
@transport.event_handler("on_participant_left")
|
||||
async def on_participant_left(transport, participant, reason):
|
||||
await task.queue_frame(EndFrame())
|
||||
|
||||
# When the first participant joins, the bot should introduce itself.
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
# Provide some air whilst tracks subscribe
|
||||
time.sleep(2)
|
||||
messages.append(
|
||||
{
|
||||
"role": "system",
|
||||
"content": "Briefly introduce yourself by saying 'hello, I'm FastBot, how can I help you today?'"})
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
runner = PipelineRunner()
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="Pipecat Bot")
|
||||
parser.add_argument("-s", "--settings", type=str, required=True, help="Pipecat bot settings")
|
||||
|
||||
args, unknown = parser.parse_known_args()
|
||||
|
||||
try:
|
||||
settings = BotSettings.model_validate_json(args.settings)
|
||||
asyncio.run(main(settings))
|
||||
except ValidationError as e:
|
||||
print(e)
|
||||
164
examples/fast-chatbot/bot_runner.py
Normal file
164
examples/fast-chatbot/bot_runner.py
Normal file
@@ -0,0 +1,164 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import os
|
||||
import argparse
|
||||
import subprocess
|
||||
|
||||
from pydantic import BaseModel, ValidationError
|
||||
from typing import Optional
|
||||
|
||||
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomObject, DailyRoomProperties, DailyRoomParams
|
||||
|
||||
from fastapi import FastAPI, Request, HTTPException
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from fastapi.responses import JSONResponse
|
||||
|
||||
from bot import BotSettings
|
||||
|
||||
from dotenv import load_dotenv
|
||||
load_dotenv(override=True)
|
||||
|
||||
|
||||
# ------------ Configuration ------------ #
|
||||
|
||||
MAX_SESSION_TIME = 5 * 60 # 5 minutes
|
||||
REQUIRED_ENV_VARS = ['DAILY_API_URL', 'DAILY_API_KEY', 'DEEPGRAM_API_KEY']
|
||||
|
||||
daily_rest_helper = DailyRESTHelper(
|
||||
os.getenv("DAILY_API_KEY", ""),
|
||||
os.getenv("DAILY_API_URL", 'https://api.daily.co/v1'))
|
||||
|
||||
|
||||
class RunnerSettings(BaseModel):
|
||||
prompt: Optional[
|
||||
str] = "You are a fast, low-latency chatbot. Your goal is to demonstrate voice-driven AI capabilities at human-like speeds. When introducing yourself briefly mention your goal is to showcase speed and conversational flow. The technology powering you is Daily for transport, Cerebrium for GPU hosting, Llama 3 (8-B version) LLM, and Deepgram for speech-to-text and text-to-speech. You are hosted on the east coast of the United States. Respond to what the user said in a creative and helpful way, but keep responses short and legible. Ensure responses contain only words. Check again that you have not included special characters other than '?' or '!'."
|
||||
deepgram_voice: Optional[str] = os.getenv("DEEPGRAM_VOICE")
|
||||
openai_model: Optional[str] = os.getenv("OPENAI_MODEL", "gpt-4o")
|
||||
openai_api_key: Optional[str] = os.getenv("OPENAI_API_KEY")
|
||||
test: Optional[bool] = None
|
||||
|
||||
# ----------------- API ----------------- #
|
||||
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
app.add_middleware(
|
||||
CORSMiddleware,
|
||||
allow_origins=["*"],
|
||||
allow_credentials=True,
|
||||
allow_methods=["*"],
|
||||
allow_headers=["*"]
|
||||
)
|
||||
|
||||
# ----------------- Main ----------------- #
|
||||
|
||||
|
||||
@app.post("/start_bot")
|
||||
async def start_bot(request: Request) -> JSONResponse:
|
||||
runner_settings = RunnerSettings()
|
||||
try:
|
||||
request_body = await request.body()
|
||||
if len(request_body) > 0:
|
||||
runner_settings = RunnerSettings.model_validate_json(request_body)
|
||||
except ValidationError as e:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail=f"Invalid request: {e}")
|
||||
except Exception as e:
|
||||
# If no data in request, pass
|
||||
pass
|
||||
|
||||
# Is this a webhook creation request?
|
||||
if runner_settings.test is not None:
|
||||
return JSONResponse({"test": True})
|
||||
|
||||
# Use specified room URL, or create a new one if not specified
|
||||
room_url = os.getenv("DAILY_SAMPLE_ROOM_URL", "")
|
||||
|
||||
if not room_url:
|
||||
params = DailyRoomParams(
|
||||
properties=DailyRoomProperties()
|
||||
)
|
||||
try:
|
||||
room: DailyRoomObject = daily_rest_helper.create_room(params=params)
|
||||
except Exception as e:
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail=f"Unable to provision room {e}")
|
||||
else:
|
||||
# Check passed room URL exists, we should assume that it already has a sip set up
|
||||
try:
|
||||
room: DailyRoomObject = daily_rest_helper.get_room_from_url(room_url)
|
||||
except Exception:
|
||||
raise HTTPException(
|
||||
status_code=500, detail=f"Room not found: {room_url}")
|
||||
|
||||
# Give the agent a token to join the session
|
||||
token = daily_rest_helper.get_token(room.url, MAX_SESSION_TIME)
|
||||
|
||||
if not room or not token:
|
||||
raise HTTPException(
|
||||
status_code=500, detail=f"Failed to get token for room: {room_url}")
|
||||
|
||||
# Spawn a new agent, and join the user session
|
||||
try:
|
||||
bot_settings = BotSettings(
|
||||
room_url=room.url,
|
||||
room_token=token,
|
||||
prompt=runner_settings.prompt,
|
||||
deepgram_voice=runner_settings.deepgram_voice,
|
||||
openai_model=runner_settings.openai_model,
|
||||
openai_api_key=runner_settings.openai_api_key,
|
||||
)
|
||||
bot_settings_str = bot_settings.model_dump_json(exclude_none=True)
|
||||
|
||||
subprocess.Popen(
|
||||
[f"python3 -m bot -s '{bot_settings_str}'"],
|
||||
shell=True,
|
||||
bufsize=1,
|
||||
cwd=os.path.dirname(os.path.abspath(__file__)))
|
||||
except Exception as e:
|
||||
raise HTTPException(
|
||||
status_code=500, detail=f"Failed to start subprocess: {e}")
|
||||
|
||||
# Grab a token for the user to join with
|
||||
user_token = daily_rest_helper.get_token(room.url, MAX_SESSION_TIME)
|
||||
|
||||
return JSONResponse({
|
||||
"room_url": room.url,
|
||||
"token": user_token,
|
||||
})
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Check environment variables
|
||||
for env_var in REQUIRED_ENV_VARS:
|
||||
if env_var not in os.environ:
|
||||
raise Exception(f"Missing environment variable: {env_var}.")
|
||||
|
||||
parser = argparse.ArgumentParser(description="Pipecat Bot Runner")
|
||||
parser.add_argument("--host", type=str,
|
||||
default=os.getenv("HOST", "0.0.0.0"), help="Host address")
|
||||
parser.add_argument("--port", type=int,
|
||||
default=os.getenv("PORT", 7860), help="Port number")
|
||||
parser.add_argument("--reload", action="store_true",
|
||||
default=True, help="Reload code on change")
|
||||
|
||||
config = parser.parse_args()
|
||||
|
||||
try:
|
||||
import uvicorn
|
||||
|
||||
uvicorn.run(
|
||||
"bot_runner:app",
|
||||
host=config.host,
|
||||
port=config.port,
|
||||
reload=config.reload
|
||||
)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("Pipecat runner shutting down...")
|
||||
12
examples/fast-chatbot/env.example
Normal file
12
examples/fast-chatbot/env.example
Normal file
@@ -0,0 +1,12 @@
|
||||
DAILY_SAMPLE_ROOM_URL= #optional: use the same room each time, or create a new one if unset
|
||||
DAILY_API_KEY=
|
||||
DAILY_API_URL=
|
||||
|
||||
DEEPGRAM_API_KEY=
|
||||
DEEPGRAM_VOICE=
|
||||
DEEPGRAM_STT_URL=
|
||||
DEEPGRAM_TTS_BASE_URL=
|
||||
|
||||
OPENAI_API_KEY=
|
||||
OPENAI_MODEL=
|
||||
OPENAI_BASE_URL=
|
||||
267
examples/fast-chatbot/helpers.py
Normal file
267
examples/fast-chatbot/helpers.py
Normal file
@@ -0,0 +1,267 @@
|
||||
from loguru import logger
|
||||
import asyncio
|
||||
import math
|
||||
import struct
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from typing import List
|
||||
|
||||
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.frames.frames import (
|
||||
Frame,
|
||||
AudioRawFrame,
|
||||
InterimTranscriptionFrame,
|
||||
TranscriptionFrame,
|
||||
TextFrame,
|
||||
StartInterruptionFrame,
|
||||
LLMFullResponseStartFrame,
|
||||
TTSStoppedFrame,
|
||||
MetricsFrame
|
||||
)
|
||||
|
||||
from pipecat.vad.vad_analyzer import VADAnalyzer, VADState
|
||||
from pipecat.services.deepgram import DeepgramTTSService
|
||||
from pipecat.services.openai import OpenAILLMContext, OpenAILLMContextFrame
|
||||
|
||||
|
||||
class GreedyLLMAggregator(FrameProcessor):
|
||||
def __init__(self, context: OpenAILLMContext = None, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self.context: OpenAILLMContext = context if context else OpenAILLMContext()
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
logger.debug(f"{frame}")
|
||||
|
||||
try:
|
||||
if isinstance(frame, InterimTranscriptionFrame):
|
||||
return
|
||||
|
||||
if isinstance(frame, TranscriptionFrame):
|
||||
# append transcribed text to last "user" frame
|
||||
if self.context.messages and self.context.messages[-1]["role"] == "user":
|
||||
last_frame = self.context.messages.pop()
|
||||
else:
|
||||
last_frame = {"role": "user", "content": ""}
|
||||
|
||||
last_frame["content"] += " " + frame.text
|
||||
self.context.messages.append(last_frame)
|
||||
|
||||
oai_context_frame = OpenAILLMContextFrame(context=self.context)
|
||||
logger.debug(f"pushing frame {oai_context_frame}")
|
||||
await self.push_frame(oai_context_frame)
|
||||
return
|
||||
|
||||
await self.push_frame(frame, direction)
|
||||
except Exception as e:
|
||||
logger.debug(f"error: {e}")
|
||||
|
||||
|
||||
class ClearableDeepgramTTSService(DeepgramTTSService):
|
||||
def __init___(self, **kwargs):
|
||||
super().__init(**kwargs)
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, StartInterruptionFrame):
|
||||
self._current_sentence = ""
|
||||
|
||||
|
||||
@dataclass
|
||||
class BufferedSentence:
|
||||
audio_frames: List[AudioRawFrame] = field(default_factory=list)
|
||||
text_frame: TextFrame = None
|
||||
|
||||
|
||||
class VADGate(FrameProcessor):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
vad_analyzer: VADAnalyzer = None,
|
||||
context: OpenAILLMContext = None,
|
||||
**kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self.vad_analyzer = vad_analyzer
|
||||
self.context = context
|
||||
|
||||
self._audio_pusher_task = None
|
||||
self._expect_text_frame_next = False
|
||||
self._sentences: List[BufferedSentence] = []
|
||||
|
||||
# queue output from tts one sentence at a time. associate a buffer of audio frames with the content of
|
||||
# each text frame.
|
||||
#
|
||||
# start a coroutine to service the queue and send sentences down the pipeline when possible.
|
||||
# 1. do not send anything when we are not in VADState.QUIET
|
||||
# 2. if we are in VADState.QUIET, send a sentence, estimate how long it will take for that sentence
|
||||
# to output, sleep until it's time to send another sentence
|
||||
# 3. each time we send a sentence, append it to the conversation context
|
||||
# 3. when the sentence buffer becomes empty, cancel the coroutine
|
||||
# 4. if we get a new LLMFullResponse, treat that as a cancellation, too
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
try:
|
||||
|
||||
# A TTSService will emit a series of AudioRawFrame objects, then a TTSStoppedFrame,
|
||||
# then a TextFrame.
|
||||
|
||||
if self._expect_text_frame_next:
|
||||
self._expect_text_frame_next = False
|
||||
if isinstance(frame, TextFrame):
|
||||
self._sentences[-1].text_frame = frame
|
||||
else:
|
||||
logger.debug(f"expected a text frame, but received {frame}")
|
||||
await self.push_frame(frame, direction)
|
||||
return
|
||||
else:
|
||||
if isinstance(frame, TextFrame):
|
||||
logger.error(f"XXXXXXXXXXXXXXXXXXX received a text frame, wasn't expecting it.")
|
||||
|
||||
if isinstance(frame, AudioRawFrame):
|
||||
# if our buffer is empty or has a "finished" sentence at the end,
|
||||
# then we need to start buffering a new sentence
|
||||
if not self._sentences or self._sentences[-1].text_frame:
|
||||
self._sentences.append(BufferedSentence())
|
||||
self._sentences[-1].audio_frames.append(frame)
|
||||
await self.maybe_start_audio_pusher_task()
|
||||
return
|
||||
|
||||
if isinstance(frame, TTSStoppedFrame):
|
||||
self._expect_text_frame_next = True
|
||||
await self.push_frame(frame, direction)
|
||||
return
|
||||
|
||||
# There are two ways we can be interrupted. During greedy inference, a new
|
||||
# LLM response can start. Or, during playout, we can get a traditional
|
||||
# user interruption frame.
|
||||
if (isinstance(frame, LLMFullResponseStartFrame) or
|
||||
isinstance(frame, StartInterruptionFrame)):
|
||||
logger.debug(f"{frame} - Handle interruption in VADGate")
|
||||
self._sentences = []
|
||||
if self._audio_pusher_task:
|
||||
self._audio_pusher_task.cancel()
|
||||
self._audio_pusher_task = None
|
||||
await self.push_frame(frame, direction)
|
||||
return
|
||||
|
||||
await self.push_frame(frame, direction)
|
||||
except Exception as e:
|
||||
logger.debug(f"error: {e}")
|
||||
|
||||
async def maybe_start_audio_pusher_task(self):
|
||||
try:
|
||||
if self._audio_pusher_task:
|
||||
return
|
||||
self._audio_pusher_task = self.get_event_loop().create_task(self.push_audio())
|
||||
|
||||
except Exception as e:
|
||||
logger.debug(f"Exception {e}")
|
||||
|
||||
async def push_audio(self):
|
||||
try:
|
||||
while True:
|
||||
if not self._sentences:
|
||||
await asyncio.sleep(0.01)
|
||||
continue
|
||||
|
||||
if self.vad_analyzer._vad_state != VADState.QUIET:
|
||||
await asyncio.sleep(0.01)
|
||||
continue
|
||||
|
||||
# we only want to push completed sentence buffers
|
||||
if not self._sentences[0].text_frame:
|
||||
await asyncio.sleep(0.01)
|
||||
continue
|
||||
|
||||
s = self._sentences.pop(0)
|
||||
if not s.audio_frames:
|
||||
continue
|
||||
sample_rate = s.audio_frames[0].sample_rate
|
||||
duration = 0
|
||||
logger.debug(f"Pushing {len(s.audio_frames)} audio frames")
|
||||
for frame in s.audio_frames:
|
||||
await self.push_frame(frame)
|
||||
# assume linear16 encoding (2 bytes per sample). todo: add some more
|
||||
# metadata to AudioRawFrame, maybe
|
||||
duration += (len(frame.audio) / 2 / frame.num_channels) / sample_rate
|
||||
await asyncio.sleep(duration - 20 / 1000)
|
||||
if self.context:
|
||||
logger.debug(f"Appending assistant message to context: [{s.text_frame.text}]")
|
||||
self.context.messages.append(
|
||||
{"role": "assistant", "content": s.text_frame.text}
|
||||
)
|
||||
await self.push_frame(s.text_frame)
|
||||
|
||||
except Exception as e:
|
||||
logger.debug(f"Exception {e}")
|
||||
|
||||
|
||||
class TranscriptionTimingLogger(FrameProcessor):
|
||||
def __init__(self, avt):
|
||||
super().__init__()
|
||||
self.name = "Transcription"
|
||||
self._avt = avt
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
try:
|
||||
await super().process_frame(frame, direction)
|
||||
if isinstance(frame, TranscriptionFrame):
|
||||
elapsed = time.time() - self._avt.last_transition_ts
|
||||
logger.debug(f"Transcription TTF: {elapsed}")
|
||||
await self.push_frame(MetricsFrame(ttfb={self.name: elapsed}))
|
||||
|
||||
await self.push_frame(frame, direction)
|
||||
except Exception as e:
|
||||
logger.debug(f"Exception {e}")
|
||||
|
||||
|
||||
class AudioVolumeTimer(FrameProcessor):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.last_transition_ts = 0
|
||||
self._prev_volume = -80
|
||||
self._speech_volume_threshold = -50
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, AudioRawFrame):
|
||||
volume = self.calculate_volume(frame)
|
||||
# print(f"Audio volume: {volume:.2f} dB")
|
||||
if (volume >= self._speech_volume_threshold and
|
||||
self._prev_volume < self._speech_volume_threshold):
|
||||
# logger.debug("transition above speech volume threshold")
|
||||
self.last_transition_ts = time.time()
|
||||
elif (volume < self._speech_volume_threshold and
|
||||
self._prev_volume >= self._speech_volume_threshold):
|
||||
# logger.debug("transition below non-speech volume threshold")
|
||||
self.last_transition_ts = time.time()
|
||||
self._prev_volume = volume
|
||||
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
def calculate_volume(self, frame: AudioRawFrame) -> float:
|
||||
if frame.num_channels != 1:
|
||||
raise ValueError(f"Expected 1 channel, got {frame.num_channels}")
|
||||
|
||||
# Unpack audio data into 16-bit integers
|
||||
fmt = f"{len(frame.audio) // 2}h"
|
||||
audio_samples = struct.unpack(fmt, frame.audio)
|
||||
|
||||
# Calculate RMS
|
||||
sum_squares = sum(sample**2 for sample in audio_samples)
|
||||
rms = math.sqrt(sum_squares / len(audio_samples))
|
||||
|
||||
# Convert RMS to decibels (dB)
|
||||
# Reference: maximum value for 16-bit audio is 32767
|
||||
if rms > 0:
|
||||
db = 20 * math.log10(rms / 32767)
|
||||
else:
|
||||
db = -96 # Minimum value (almost silent)
|
||||
|
||||
return db
|
||||
6
examples/fast-chatbot/requirements.txt
Normal file
6
examples/fast-chatbot/requirements.txt
Normal file
@@ -0,0 +1,6 @@
|
||||
pipecat-ai[daily,openai,silero,deepgram]
|
||||
fastapi
|
||||
uvicorn
|
||||
requests
|
||||
python-dotenv
|
||||
loguru
|
||||
161
examples/twilio-chatbot/.gitignore
vendored
Normal file
161
examples/twilio-chatbot/.gitignore
vendored
Normal file
@@ -0,0 +1,161 @@
|
||||
# Byte-compiled / optimized / DLL files
|
||||
__pycache__/
|
||||
*.py[cod]
|
||||
*$py.class
|
||||
|
||||
# C extensions
|
||||
*.so
|
||||
|
||||
# Distribution / packaging
|
||||
.Python
|
||||
build/
|
||||
develop-eggs/
|
||||
dist/
|
||||
downloads/
|
||||
eggs/
|
||||
.eggs/
|
||||
lib/
|
||||
lib64/
|
||||
parts/
|
||||
sdist/
|
||||
var/
|
||||
wheels/
|
||||
share/python-wheels/
|
||||
*.egg-info/
|
||||
.installed.cfg
|
||||
*.egg
|
||||
MANIFEST
|
||||
|
||||
# PyInstaller
|
||||
# Usually these files are written by a python script from a template
|
||||
# before PyInstaller builds the exe, so as to inject date/other infos into it.
|
||||
*.manifest
|
||||
*.spec
|
||||
|
||||
# Installer logs
|
||||
pip-log.txt
|
||||
pip-delete-this-directory.txt
|
||||
|
||||
# Unit test / coverage reports
|
||||
htmlcov/
|
||||
.tox/
|
||||
.nox/
|
||||
.coverage
|
||||
.coverage.*
|
||||
.cache
|
||||
nosetests.xml
|
||||
coverage.xml
|
||||
*.cover
|
||||
*.py,cover
|
||||
.hypothesis/
|
||||
.pytest_cache/
|
||||
cover/
|
||||
|
||||
# Translations
|
||||
*.mo
|
||||
*.pot
|
||||
|
||||
# Django stuff:
|
||||
*.log
|
||||
local_settings.py
|
||||
db.sqlite3
|
||||
db.sqlite3-journal
|
||||
|
||||
# Flask stuff:
|
||||
instance/
|
||||
.webassets-cache
|
||||
|
||||
# Scrapy stuff:
|
||||
.scrapy
|
||||
|
||||
# Sphinx documentation
|
||||
docs/_build/
|
||||
|
||||
# PyBuilder
|
||||
.pybuilder/
|
||||
target/
|
||||
|
||||
# Jupyter Notebook
|
||||
.ipynb_checkpoints
|
||||
|
||||
# IPython
|
||||
profile_default/
|
||||
ipython_config.py
|
||||
|
||||
# pyenv
|
||||
# For a library or package, you might want to ignore these files since the code is
|
||||
# intended to run in multiple environments; otherwise, check them in:
|
||||
# .python-version
|
||||
|
||||
# pipenv
|
||||
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
|
||||
# However, in case of collaboration, if having platform-specific dependencies or dependencies
|
||||
# having no cross-platform support, pipenv may install dependencies that don't work, or not
|
||||
# install all needed dependencies.
|
||||
#Pipfile.lock
|
||||
|
||||
# poetry
|
||||
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
|
||||
# This is especially recommended for binary packages to ensure reproducibility, and is more
|
||||
# commonly ignored for libraries.
|
||||
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
|
||||
#poetry.lock
|
||||
|
||||
# pdm
|
||||
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
|
||||
#pdm.lock
|
||||
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
|
||||
# in version control.
|
||||
# https://pdm.fming.dev/#use-with-ide
|
||||
.pdm.toml
|
||||
|
||||
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
|
||||
__pypackages__/
|
||||
|
||||
# Celery stuff
|
||||
celerybeat-schedule
|
||||
celerybeat.pid
|
||||
|
||||
# SageMath parsed files
|
||||
*.sage.py
|
||||
|
||||
# Environments
|
||||
.env
|
||||
.venv
|
||||
env/
|
||||
venv/
|
||||
ENV/
|
||||
env.bak/
|
||||
venv.bak/
|
||||
|
||||
# Spyder project settings
|
||||
.spyderproject
|
||||
.spyproject
|
||||
|
||||
# Rope project settings
|
||||
.ropeproject
|
||||
|
||||
# mkdocs documentation
|
||||
/site
|
||||
|
||||
# mypy
|
||||
.mypy_cache/
|
||||
.dmypy.json
|
||||
dmypy.json
|
||||
|
||||
# Pyre type checker
|
||||
.pyre/
|
||||
|
||||
# pytype static type analyzer
|
||||
.pytype/
|
||||
|
||||
# Cython debug symbols
|
||||
cython_debug/
|
||||
|
||||
# PyCharm
|
||||
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
|
||||
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
|
||||
# and can be added to the global gitignore or merged into this file. For a more nuclear
|
||||
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
|
||||
#.idea/
|
||||
runpod.toml
|
||||
20
examples/twilio-chatbot/Dockerfile
Normal file
20
examples/twilio-chatbot/Dockerfile
Normal file
@@ -0,0 +1,20 @@
|
||||
# Use an official Python runtime as a parent image
|
||||
FROM python:3.10-bullseye
|
||||
|
||||
# Set the working directory in the container
|
||||
WORKDIR /twilio-chatbot
|
||||
|
||||
# Copy the requirements file into the container
|
||||
COPY requirements.txt .
|
||||
|
||||
# Install any needed packages specified in requirements.txt
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
|
||||
# Copy the current directory contents into the container
|
||||
COPY . .
|
||||
|
||||
# Expose the desired port
|
||||
EXPOSE 8765
|
||||
|
||||
# Run the application
|
||||
CMD ["uvicorn", "server:app", "--host", "0.0.0.0", "--port", "8765"]
|
||||
82
examples/twilio-chatbot/README.md
Normal file
82
examples/twilio-chatbot/README.md
Normal file
@@ -0,0 +1,82 @@
|
||||
# Twilio Chatbot
|
||||
|
||||
This project is a FastAPI-based chatbot that integrates with Twilio to handle WebSocket connections and provide real-time communication. The project includes endpoints for starting a call and handling WebSocket connections.
|
||||
|
||||
## Table of Contents
|
||||
|
||||
- [Features](#features)
|
||||
- [Requirements](#requirements)
|
||||
- [Installation](#installation)
|
||||
- [Configure Twilio URLs](#configure-twilio-urls)
|
||||
- [Running the Application](#running-the-application)
|
||||
- [Usage](#usage)
|
||||
|
||||
## Features
|
||||
|
||||
- **FastAPI**: A modern, fast (high-performance), web framework for building APIs with Python 3.6+.
|
||||
- **WebSocket Support**: Real-time communication using WebSockets.
|
||||
- **CORS Middleware**: Allowing cross-origin requests for testing.
|
||||
- **Dockerized**: Easily deployable using Docker.
|
||||
|
||||
## Requirements
|
||||
|
||||
- Python 3.10
|
||||
- Docker (for containerized deployment)
|
||||
- ngrok (for tunneling)
|
||||
- Twilio Account
|
||||
|
||||
## Installation
|
||||
|
||||
1. **Set up a virtual environment** (optional but recommended):
|
||||
```sh
|
||||
python -m venv venv
|
||||
source venv/bin/activate # On Windows, use `venv\Scripts\activate`
|
||||
```
|
||||
|
||||
2. **Install dependencies**:
|
||||
```sh
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
|
||||
3. **Create .env**:
|
||||
create .env based on env.example
|
||||
|
||||
4. **Install ngrok**:
|
||||
Follow the instructions on the [ngrok website](https://ngrok.com/download) to download and install ngrok.
|
||||
|
||||
## Configure Twilio URLs
|
||||
|
||||
1. **Update the Twilio Webhook**:
|
||||
Copy the ngrok URL and update your Twilio phone number webhook URL to `http://<ngrok_url>/start_call`.
|
||||
|
||||
2. **Update the streams.xml**:
|
||||
Copy the ngrok URL and update templates/streams.xml with `wss://<ngrok_url>/ws`.
|
||||
|
||||
## Running the Application
|
||||
|
||||
### Using Python
|
||||
|
||||
1. **Run the FastAPI application**:
|
||||
```sh
|
||||
python server.py
|
||||
```
|
||||
|
||||
2. **Start ngrok**:
|
||||
In a new terminal, start ngrok to tunnel the local server:
|
||||
```sh
|
||||
ngrok http 8765
|
||||
```
|
||||
### Using Docker
|
||||
|
||||
1. **Build the Docker image**:
|
||||
```sh
|
||||
docker build -t twilio-chatbot .
|
||||
```
|
||||
|
||||
2. **Run the Docker container**:
|
||||
```sh
|
||||
docker run -it --rm -p 8765:8765 twilio-chatbot
|
||||
```
|
||||
## Usage
|
||||
|
||||
To start a call, simply make a call to your Twilio phone number. The webhook URL will direct the call to your FastAPI application, which will handle it accordingly.
|
||||
88
examples/twilio-chatbot/bot.py
Normal file
88
examples/twilio-chatbot/bot.py
Normal file
@@ -0,0 +1,88 @@
|
||||
import aiohttp
|
||||
import os
|
||||
import sys
|
||||
|
||||
from pipecat.frames.frames import EndFrame, LLMMessagesFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.llm_response import (
|
||||
LLMAssistantResponseAggregator,
|
||||
LLMUserResponseAggregator
|
||||
)
|
||||
from pipecat.services.openai import OpenAILLMService
|
||||
from pipecat.services.deepgram import DeepgramSTTService
|
||||
from pipecat.services.elevenlabs import ElevenLabsTTSService
|
||||
from pipecat.transports.network.fastapi_websocket import FastAPIWebsocketTransport, FastAPIWebsocketParams
|
||||
from pipecat.vad.silero import SileroVADAnalyzer
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from dotenv import load_dotenv
|
||||
load_dotenv(override=True)
|
||||
|
||||
logger.remove(0)
|
||||
logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
|
||||
async def run_bot(websocket_client):
|
||||
async with aiohttp.ClientSession() as session:
|
||||
transport = FastAPIWebsocketTransport(
|
||||
websocket=websocket_client,
|
||||
params=FastAPIWebsocketParams(
|
||||
audio_out_enabled=True,
|
||||
add_wav_header=False,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
vad_audio_passthrough=True
|
||||
)
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
model="gpt-4o")
|
||||
|
||||
stt = DeepgramSTTService(api_key=os.getenv('DEEPGRAM_API_KEY'))
|
||||
|
||||
tts = ElevenLabsTTSService(
|
||||
aiohttp_session=session,
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY"),
|
||||
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
|
||||
)
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful LLM in an audio call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
|
||||
},
|
||||
]
|
||||
|
||||
tma_in = LLMUserResponseAggregator(messages)
|
||||
tma_out = LLMAssistantResponseAggregator(messages)
|
||||
|
||||
pipeline = Pipeline([
|
||||
transport.input(), # Websocket input from client
|
||||
stt, # Speech-To-Text
|
||||
tma_in, # User responses
|
||||
llm, # LLM
|
||||
tts, # Text-To-Speech
|
||||
transport.output(), # Websocket output to client
|
||||
tma_out # LLM responses
|
||||
])
|
||||
|
||||
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
# Kick off the conversation.
|
||||
messages.append(
|
||||
{"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
await task.queue_frames([EndFrame()])
|
||||
|
||||
runner = PipelineRunner(handle_sigint=False)
|
||||
|
||||
await runner.run(task)
|
||||
4
examples/twilio-chatbot/env.example
Normal file
4
examples/twilio-chatbot/env.example
Normal file
@@ -0,0 +1,4 @@
|
||||
OPENAI_API_KEY=
|
||||
DEEPGRAM_API_KEY=
|
||||
ELEVENLABS_API_KEY=
|
||||
ELEVENLABS_VOICE_ID=
|
||||
5
examples/twilio-chatbot/requirements.txt
Normal file
5
examples/twilio-chatbot/requirements.txt
Normal file
@@ -0,0 +1,5 @@
|
||||
pipecat-ai[daily,openai,silero,deepgram]
|
||||
fastapi
|
||||
uvicorn
|
||||
python-dotenv
|
||||
loguru
|
||||
34
examples/twilio-chatbot/server.py
Normal file
34
examples/twilio-chatbot/server.py
Normal file
@@ -0,0 +1,34 @@
|
||||
import uvicorn
|
||||
|
||||
from fastapi import FastAPI, WebSocket
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from starlette.responses import HTMLResponse
|
||||
|
||||
from bot import run_bot
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
app.add_middleware(
|
||||
CORSMiddleware,
|
||||
allow_origins=["*"], # Allow all origins for testing
|
||||
allow_credentials=True,
|
||||
allow_methods=["*"],
|
||||
allow_headers=["*"],
|
||||
)
|
||||
|
||||
|
||||
@app.post('/start_call')
|
||||
async def start_call():
|
||||
print("POST TwiML")
|
||||
return HTMLResponse(content=open("templates/streams.xml").read(), media_type="application/xml")
|
||||
|
||||
|
||||
@app.websocket("/ws")
|
||||
async def websocket_endpoint(websocket: WebSocket):
|
||||
await websocket.accept()
|
||||
print("WebSocket connection accepted")
|
||||
await run_bot(websocket)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
uvicorn.run(app, host="0.0.0.0", port=8765)
|
||||
7
examples/twilio-chatbot/templates/streams.xml
Normal file
7
examples/twilio-chatbot/templates/streams.xml
Normal file
@@ -0,0 +1,7 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<Response>
|
||||
<Connect>
|
||||
<Stream url="wss://<your server url>/ws"></Stream>
|
||||
</Connect>
|
||||
<Pause length="40"/>
|
||||
</Response>
|
||||
@@ -26,6 +26,8 @@ anyio==4.4.0
|
||||
# anthropic
|
||||
# httpx
|
||||
# openai
|
||||
# starlette
|
||||
# watchfiles
|
||||
async-timeout==4.0.3
|
||||
# via
|
||||
# aiohttp
|
||||
@@ -54,12 +56,15 @@ cffi==1.16.0
|
||||
charset-normalizer==3.3.2
|
||||
# via requests
|
||||
click==8.1.7
|
||||
# via flask
|
||||
# via
|
||||
# flask
|
||||
# typer
|
||||
# uvicorn
|
||||
coloredlogs==15.0.1
|
||||
# via onnxruntime
|
||||
ctranslate2==4.3.1
|
||||
# via faster-whisper
|
||||
daily-python==0.9.1
|
||||
daily-python==0.10.0
|
||||
# via pipecat-ai (pyproject.toml)
|
||||
dataclasses-json==0.6.7
|
||||
# via
|
||||
@@ -71,17 +76,25 @@ distro==1.9.0
|
||||
# via
|
||||
# anthropic
|
||||
# openai
|
||||
dnspython==2.6.1
|
||||
# via email-validator
|
||||
einops==0.8.0
|
||||
# via pipecat-ai (pyproject.toml)
|
||||
email-validator==2.2.0
|
||||
# via fastapi
|
||||
exceptiongroup==1.2.1
|
||||
# via
|
||||
# anyio
|
||||
# pytest
|
||||
fal-client==0.4.0
|
||||
# via pipecat-ai (pyproject.toml)
|
||||
fastapi==0.111.0
|
||||
# via pipecat-ai (pyproject.toml)
|
||||
fastapi-cli==0.0.4
|
||||
# via fastapi
|
||||
faster-whisper==1.0.2
|
||||
# via pipecat-ai (pyproject.toml)
|
||||
filelock==3.15.1
|
||||
filelock==3.15.3
|
||||
# via
|
||||
# huggingface-hub
|
||||
# pyht
|
||||
@@ -113,7 +126,7 @@ google-api-core[grpc]==2.19.0
|
||||
# google-ai-generativelanguage
|
||||
# google-api-python-client
|
||||
# google-generativeai
|
||||
google-api-python-client==2.133.0
|
||||
google-api-python-client==2.134.0
|
||||
# via google-generativeai
|
||||
google-auth==2.30.0
|
||||
# via
|
||||
@@ -140,24 +153,29 @@ grpcio==1.64.1
|
||||
grpcio-status==1.62.2
|
||||
# via google-api-core
|
||||
h11==0.14.0
|
||||
# via httpcore
|
||||
# via
|
||||
# httpcore
|
||||
# uvicorn
|
||||
httpcore==1.0.5
|
||||
# via httpx
|
||||
httplib2==0.22.0
|
||||
# via
|
||||
# google-api-python-client
|
||||
# google-auth-httplib2
|
||||
httptools==0.6.1
|
||||
# via uvicorn
|
||||
httpx==0.27.0
|
||||
# via
|
||||
# anthropic
|
||||
# cartesia
|
||||
# deepgram-sdk
|
||||
# fal-client
|
||||
# fastapi
|
||||
# openai
|
||||
# openpipe
|
||||
httpx-sse==0.4.0
|
||||
# via fal-client
|
||||
huggingface-hub==0.23.3
|
||||
huggingface-hub==0.23.4
|
||||
# via
|
||||
# faster-whisper
|
||||
# timm
|
||||
@@ -168,6 +186,7 @@ humanfriendly==10.0
|
||||
idna==3.7
|
||||
# via
|
||||
# anyio
|
||||
# email-validator
|
||||
# httpx
|
||||
# requests
|
||||
# yarl
|
||||
@@ -177,41 +196,46 @@ itsdangerous==2.2.0
|
||||
# via flask
|
||||
jinja2==3.1.4
|
||||
# via
|
||||
# fastapi
|
||||
# flask
|
||||
# torch
|
||||
jsonpatch==1.33
|
||||
# via langchain-core
|
||||
jsonpointer==3.0.0
|
||||
# via jsonpatch
|
||||
langchain==0.2.3
|
||||
langchain==0.2.5
|
||||
# via
|
||||
# langchain-community
|
||||
# pipecat-ai (pyproject.toml)
|
||||
langchain-community==0.2.4
|
||||
langchain-community==0.2.5
|
||||
# via pipecat-ai (pyproject.toml)
|
||||
langchain-core==0.2.5
|
||||
langchain-core==0.2.9
|
||||
# via
|
||||
# langchain
|
||||
# langchain-community
|
||||
# langchain-openai
|
||||
# langchain-text-splitters
|
||||
langchain-openai==0.1.8
|
||||
langchain-openai==0.1.9
|
||||
# via pipecat-ai (pyproject.toml)
|
||||
langchain-text-splitters==0.2.1
|
||||
# via langchain
|
||||
langsmith==0.1.77
|
||||
langsmith==0.1.81
|
||||
# via
|
||||
# langchain
|
||||
# langchain-community
|
||||
# langchain-core
|
||||
loguru==0.7.2
|
||||
# via pipecat-ai (pyproject.toml)
|
||||
markdown-it-py==3.0.0
|
||||
# via rich
|
||||
markupsafe==2.1.5
|
||||
# via
|
||||
# jinja2
|
||||
# werkzeug
|
||||
marshmallow==3.21.3
|
||||
# via dataclasses-json
|
||||
mdurl==0.1.2
|
||||
# via markdown-it-py
|
||||
mpmath==1.3.0
|
||||
# via sympy
|
||||
multidict==6.0.5
|
||||
@@ -273,9 +297,11 @@ openai==1.26.0
|
||||
# pipecat-ai (pyproject.toml)
|
||||
openpipe==4.14.0
|
||||
# via pipecat-ai (pyproject.toml)
|
||||
orjson==3.10.4
|
||||
# via langsmith
|
||||
packaging==23.2
|
||||
orjson==3.10.5
|
||||
# via
|
||||
# fastapi
|
||||
# langsmith
|
||||
packaging==24.1
|
||||
# via
|
||||
# huggingface-hub
|
||||
# langchain-core
|
||||
@@ -289,7 +315,7 @@ pillow==10.3.0
|
||||
# torchvision
|
||||
pluggy==1.5.0
|
||||
# via pytest
|
||||
proto-plus==1.23.0
|
||||
proto-plus==1.24.0
|
||||
# via
|
||||
# google-ai-generativelanguage
|
||||
# google-api-core
|
||||
@@ -317,6 +343,7 @@ pycparser==2.22
|
||||
pydantic==2.7.4
|
||||
# via
|
||||
# anthropic
|
||||
# fastapi
|
||||
# google-generativeai
|
||||
# langchain
|
||||
# langchain-core
|
||||
@@ -324,6 +351,8 @@ pydantic==2.7.4
|
||||
# openai
|
||||
pydantic-core==2.18.4
|
||||
# via pydantic
|
||||
pygments==2.18.0
|
||||
# via rich
|
||||
pyht==0.0.28
|
||||
# via pipecat-ai (pyproject.toml)
|
||||
pyloudnorm==0.1.1
|
||||
@@ -337,7 +366,11 @@ pytest-asyncio==0.23.7
|
||||
python-dateutil==2.9.0.post0
|
||||
# via openpipe
|
||||
python-dotenv==1.0.1
|
||||
# via pipecat-ai (pyproject.toml)
|
||||
# via
|
||||
# pipecat-ai (pyproject.toml)
|
||||
# uvicorn
|
||||
python-multipart==0.0.9
|
||||
# via fastapi
|
||||
pyyaml==6.0.1
|
||||
# via
|
||||
# ctranslate2
|
||||
@@ -347,6 +380,7 @@ pyyaml==6.0.1
|
||||
# langchain-core
|
||||
# timm
|
||||
# transformers
|
||||
# uvicorn
|
||||
regex==2024.5.15
|
||||
# via
|
||||
# tiktoken
|
||||
@@ -362,6 +396,8 @@ requests==2.32.3
|
||||
# pyht
|
||||
# tiktoken
|
||||
# transformers
|
||||
rich==13.7.1
|
||||
# via typer
|
||||
rsa==4.9
|
||||
# via google-auth
|
||||
safetensors==0.4.3
|
||||
@@ -370,6 +406,8 @@ safetensors==0.4.3
|
||||
# transformers
|
||||
scipy==1.13.1
|
||||
# via pyloudnorm
|
||||
shellingham==1.5.4
|
||||
# via typer
|
||||
six==1.16.0
|
||||
# via python-dateutil
|
||||
sniffio==1.3.1
|
||||
@@ -380,15 +418,17 @@ sniffio==1.3.1
|
||||
# openai
|
||||
sounddevice==0.4.7
|
||||
# via pipecat-ai (pyproject.toml)
|
||||
sqlalchemy==2.0.30
|
||||
sqlalchemy==2.0.31
|
||||
# via
|
||||
# langchain
|
||||
# langchain-community
|
||||
starlette==0.37.2
|
||||
# via fastapi
|
||||
sympy==1.12.1
|
||||
# via
|
||||
# onnxruntime
|
||||
# torch
|
||||
tenacity==8.3.0
|
||||
tenacity==8.4.1
|
||||
# via
|
||||
# langchain
|
||||
# langchain-community
|
||||
@@ -424,11 +464,14 @@ transformers==4.40.2
|
||||
# via pipecat-ai (pyproject.toml)
|
||||
triton==2.3.1
|
||||
# via torch
|
||||
typer==0.12.3
|
||||
# via fastapi-cli
|
||||
typing-extensions==4.12.2
|
||||
# via
|
||||
# anthropic
|
||||
# anyio
|
||||
# deepgram-sdk
|
||||
# fastapi
|
||||
# google-generativeai
|
||||
# huggingface-hub
|
||||
# openai
|
||||
@@ -437,20 +480,31 @@ typing-extensions==4.12.2
|
||||
# pydantic-core
|
||||
# sqlalchemy
|
||||
# torch
|
||||
# typer
|
||||
# typing-inspect
|
||||
# uvicorn
|
||||
typing-inspect==0.9.0
|
||||
# via dataclasses-json
|
||||
ujson==5.10.0
|
||||
# via fastapi
|
||||
uritemplate==4.1.1
|
||||
# via google-api-python-client
|
||||
urllib3==2.2.1
|
||||
urllib3==2.2.2
|
||||
# via requests
|
||||
uvicorn[standard]==0.30.1
|
||||
# via fastapi
|
||||
uvloop==0.19.0
|
||||
# via uvicorn
|
||||
verboselogs==1.7
|
||||
# via deepgram-sdk
|
||||
watchfiles==0.22.0
|
||||
# via uvicorn
|
||||
websockets==12.0
|
||||
# via
|
||||
# cartesia
|
||||
# deepgram-sdk
|
||||
# pipecat-ai (pyproject.toml)
|
||||
# uvicorn
|
||||
werkzeug==3.0.3
|
||||
# via flask
|
||||
yarl==1.9.4
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
#
|
||||
# This file is autogenerated by pip-compile with Python 3.10
|
||||
# This file is autogenerated by pip-compile with Python 3.12
|
||||
# by the following command:
|
||||
#
|
||||
# pip-compile --all-extras pyproject.toml
|
||||
@@ -26,10 +26,8 @@ anyio==4.4.0
|
||||
# anthropic
|
||||
# httpx
|
||||
# openai
|
||||
async-timeout==4.0.3
|
||||
# via
|
||||
# aiohttp
|
||||
# langchain
|
||||
# starlette
|
||||
# watchfiles
|
||||
attrs==23.2.0
|
||||
# via
|
||||
# aiohttp
|
||||
@@ -54,12 +52,15 @@ cffi==1.16.0
|
||||
charset-normalizer==3.3.2
|
||||
# via requests
|
||||
click==8.1.7
|
||||
# via flask
|
||||
# via
|
||||
# flask
|
||||
# typer
|
||||
# uvicorn
|
||||
coloredlogs==15.0.1
|
||||
# via onnxruntime
|
||||
ctranslate2==4.3.1
|
||||
# via faster-whisper
|
||||
daily-python==0.9.1
|
||||
daily-python==0.10.0
|
||||
# via pipecat-ai (pyproject.toml)
|
||||
dataclasses-json==0.6.7
|
||||
# via
|
||||
@@ -71,17 +72,21 @@ distro==1.9.0
|
||||
# via
|
||||
# anthropic
|
||||
# openai
|
||||
dnspython==2.6.1
|
||||
# via email-validator
|
||||
einops==0.8.0
|
||||
# via pipecat-ai (pyproject.toml)
|
||||
exceptiongroup==1.2.1
|
||||
# via
|
||||
# anyio
|
||||
# pytest
|
||||
email-validator==2.2.0
|
||||
# via fastapi
|
||||
fal-client==0.4.0
|
||||
# via pipecat-ai (pyproject.toml)
|
||||
fastapi==0.111.0
|
||||
# via pipecat-ai (pyproject.toml)
|
||||
fastapi-cli==0.0.4
|
||||
# via fastapi
|
||||
faster-whisper==1.0.2
|
||||
# via pipecat-ai (pyproject.toml)
|
||||
filelock==3.15.1
|
||||
filelock==3.15.3
|
||||
# via
|
||||
# huggingface-hub
|
||||
# pyht
|
||||
@@ -112,7 +117,7 @@ google-api-core[grpc]==2.19.0
|
||||
# google-ai-generativelanguage
|
||||
# google-api-python-client
|
||||
# google-generativeai
|
||||
google-api-python-client==2.133.0
|
||||
google-api-python-client==2.134.0
|
||||
# via google-generativeai
|
||||
google-auth==2.30.0
|
||||
# via
|
||||
@@ -137,24 +142,29 @@ grpcio==1.64.1
|
||||
grpcio-status==1.62.2
|
||||
# via google-api-core
|
||||
h11==0.14.0
|
||||
# via httpcore
|
||||
# via
|
||||
# httpcore
|
||||
# uvicorn
|
||||
httpcore==1.0.5
|
||||
# via httpx
|
||||
httplib2==0.22.0
|
||||
# via
|
||||
# google-api-python-client
|
||||
# google-auth-httplib2
|
||||
httptools==0.6.1
|
||||
# via uvicorn
|
||||
httpx==0.27.0
|
||||
# via
|
||||
# anthropic
|
||||
# cartesia
|
||||
# deepgram-sdk
|
||||
# fal-client
|
||||
# fastapi
|
||||
# openai
|
||||
# openpipe
|
||||
httpx-sse==0.4.0
|
||||
# via fal-client
|
||||
huggingface-hub==0.23.3
|
||||
huggingface-hub==0.23.4
|
||||
# via
|
||||
# faster-whisper
|
||||
# timm
|
||||
@@ -165,6 +175,7 @@ humanfriendly==10.0
|
||||
idna==3.7
|
||||
# via
|
||||
# anyio
|
||||
# email-validator
|
||||
# httpx
|
||||
# requests
|
||||
# yarl
|
||||
@@ -174,41 +185,46 @@ itsdangerous==2.2.0
|
||||
# via flask
|
||||
jinja2==3.1.4
|
||||
# via
|
||||
# fastapi
|
||||
# flask
|
||||
# torch
|
||||
jsonpatch==1.33
|
||||
# via langchain-core
|
||||
jsonpointer==3.0.0
|
||||
# via jsonpatch
|
||||
langchain==0.2.3
|
||||
langchain==0.2.5
|
||||
# via
|
||||
# langchain-community
|
||||
# pipecat-ai (pyproject.toml)
|
||||
langchain-community==0.2.4
|
||||
langchain-community==0.2.5
|
||||
# via pipecat-ai (pyproject.toml)
|
||||
langchain-core==0.2.5
|
||||
langchain-core==0.2.9
|
||||
# via
|
||||
# langchain
|
||||
# langchain-community
|
||||
# langchain-openai
|
||||
# langchain-text-splitters
|
||||
langchain-openai==0.1.8
|
||||
langchain-openai==0.1.9
|
||||
# via pipecat-ai (pyproject.toml)
|
||||
langchain-text-splitters==0.2.1
|
||||
# via langchain
|
||||
langsmith==0.1.77
|
||||
langsmith==0.1.81
|
||||
# via
|
||||
# langchain
|
||||
# langchain-community
|
||||
# langchain-core
|
||||
loguru==0.7.2
|
||||
# via pipecat-ai (pyproject.toml)
|
||||
markdown-it-py==3.0.0
|
||||
# via rich
|
||||
markupsafe==2.1.5
|
||||
# via
|
||||
# jinja2
|
||||
# werkzeug
|
||||
marshmallow==3.21.3
|
||||
# via dataclasses-json
|
||||
mdurl==0.1.2
|
||||
# via markdown-it-py
|
||||
mpmath==1.3.0
|
||||
# via sympy
|
||||
multidict==6.0.5
|
||||
@@ -239,9 +255,11 @@ openai==1.26.0
|
||||
# pipecat-ai (pyproject.toml)
|
||||
openpipe==4.14.0
|
||||
# via pipecat-ai (pyproject.toml)
|
||||
orjson==3.10.4
|
||||
# via langsmith
|
||||
packaging==23.2
|
||||
orjson==3.10.5
|
||||
# via
|
||||
# fastapi
|
||||
# langsmith
|
||||
packaging==24.1
|
||||
# via
|
||||
# huggingface-hub
|
||||
# langchain-core
|
||||
@@ -255,7 +273,7 @@ pillow==10.3.0
|
||||
# torchvision
|
||||
pluggy==1.5.0
|
||||
# via pytest
|
||||
proto-plus==1.23.0
|
||||
proto-plus==1.24.0
|
||||
# via
|
||||
# google-ai-generativelanguage
|
||||
# google-api-core
|
||||
@@ -283,6 +301,7 @@ pycparser==2.22
|
||||
pydantic==2.7.4
|
||||
# via
|
||||
# anthropic
|
||||
# fastapi
|
||||
# google-generativeai
|
||||
# langchain
|
||||
# langchain-core
|
||||
@@ -290,6 +309,8 @@ pydantic==2.7.4
|
||||
# openai
|
||||
pydantic-core==2.18.4
|
||||
# via pydantic
|
||||
pygments==2.18.0
|
||||
# via rich
|
||||
pyht==0.0.28
|
||||
# via pipecat-ai (pyproject.toml)
|
||||
pyloudnorm==0.1.1
|
||||
@@ -303,7 +324,11 @@ pytest-asyncio==0.23.7
|
||||
python-dateutil==2.9.0.post0
|
||||
# via openpipe
|
||||
python-dotenv==1.0.1
|
||||
# via pipecat-ai (pyproject.toml)
|
||||
# via
|
||||
# pipecat-ai (pyproject.toml)
|
||||
# uvicorn
|
||||
python-multipart==0.0.9
|
||||
# via fastapi
|
||||
pyyaml==6.0.1
|
||||
# via
|
||||
# ctranslate2
|
||||
@@ -313,6 +338,7 @@ pyyaml==6.0.1
|
||||
# langchain-core
|
||||
# timm
|
||||
# transformers
|
||||
# uvicorn
|
||||
regex==2024.5.15
|
||||
# via
|
||||
# tiktoken
|
||||
@@ -328,6 +354,8 @@ requests==2.32.3
|
||||
# pyht
|
||||
# tiktoken
|
||||
# transformers
|
||||
rich==13.7.1
|
||||
# via typer
|
||||
rsa==4.9
|
||||
# via google-auth
|
||||
safetensors==0.4.3
|
||||
@@ -336,6 +364,8 @@ safetensors==0.4.3
|
||||
# transformers
|
||||
scipy==1.13.1
|
||||
# via pyloudnorm
|
||||
shellingham==1.5.4
|
||||
# via typer
|
||||
six==1.16.0
|
||||
# via python-dateutil
|
||||
sniffio==1.3.1
|
||||
@@ -346,15 +376,17 @@ sniffio==1.3.1
|
||||
# openai
|
||||
sounddevice==0.4.7
|
||||
# via pipecat-ai (pyproject.toml)
|
||||
sqlalchemy==2.0.30
|
||||
sqlalchemy==2.0.31
|
||||
# via
|
||||
# langchain
|
||||
# langchain-community
|
||||
starlette==0.37.2
|
||||
# via fastapi
|
||||
sympy==1.12.1
|
||||
# via
|
||||
# onnxruntime
|
||||
# torch
|
||||
tenacity==8.3.0
|
||||
tenacity==8.4.1
|
||||
# via
|
||||
# langchain
|
||||
# langchain-community
|
||||
@@ -368,8 +400,6 @@ tokenizers==0.19.1
|
||||
# anthropic
|
||||
# faster-whisper
|
||||
# transformers
|
||||
tomli==2.0.1
|
||||
# via pytest
|
||||
torch==2.3.1
|
||||
# via
|
||||
# pipecat-ai (pyproject.toml)
|
||||
@@ -388,11 +418,13 @@ tqdm==4.66.4
|
||||
# transformers
|
||||
transformers==4.40.2
|
||||
# via pipecat-ai (pyproject.toml)
|
||||
typer==0.12.3
|
||||
# via fastapi-cli
|
||||
typing-extensions==4.12.2
|
||||
# via
|
||||
# anthropic
|
||||
# anyio
|
||||
# deepgram-sdk
|
||||
# fastapi
|
||||
# google-generativeai
|
||||
# huggingface-hub
|
||||
# openai
|
||||
@@ -401,20 +433,30 @@ typing-extensions==4.12.2
|
||||
# pydantic-core
|
||||
# sqlalchemy
|
||||
# torch
|
||||
# typer
|
||||
# typing-inspect
|
||||
typing-inspect==0.9.0
|
||||
# via dataclasses-json
|
||||
ujson==5.10.0
|
||||
# via fastapi
|
||||
uritemplate==4.1.1
|
||||
# via google-api-python-client
|
||||
urllib3==2.2.1
|
||||
urllib3==2.2.2
|
||||
# via requests
|
||||
uvicorn[standard]==0.30.1
|
||||
# via fastapi
|
||||
uvloop==0.19.0
|
||||
# via uvicorn
|
||||
verboselogs==1.7
|
||||
# via deepgram-sdk
|
||||
watchfiles==0.22.0
|
||||
# via uvicorn
|
||||
websockets==12.0
|
||||
# via
|
||||
# cartesia
|
||||
# deepgram-sdk
|
||||
# pipecat-ai (pyproject.toml)
|
||||
# uvicorn
|
||||
werkzeug==3.0.3
|
||||
# via flask
|
||||
yarl==1.9.4
|
||||
|
||||
@@ -37,7 +37,7 @@ Website = "https://pipecat.ai"
|
||||
anthropic = [ "anthropic~=0.25.7" ]
|
||||
azure = [ "azure-cognitiveservices-speech~=1.37.0" ]
|
||||
cartesia = [ "numpy~=1.26.0", "sounddevice", "cartesia" ]
|
||||
daily = [ "daily-python~=0.9.0" ]
|
||||
daily = [ "daily-python~=0.10.0" ]
|
||||
deepgram = [ "deepgram-sdk~=3.2.7" ]
|
||||
examples = [ "python-dotenv~=1.0.0", "flask~=3.0.3", "flask_cors~=4.0.1" ]
|
||||
fal = [ "fal-client~=0.4.0" ]
|
||||
@@ -50,7 +50,7 @@ openai = [ "openai~=1.26.0" ]
|
||||
openpipe = [ "openpipe~=4.14.0" ]
|
||||
playht = [ "pyht~=0.0.28" ]
|
||||
silero = [ "torch~=2.3.0", "torchaudio~=2.3.0" ]
|
||||
websocket = [ "websockets~=12.0" ]
|
||||
websocket = [ "websockets~=12.0", "fastapi~=0.111.0" ]
|
||||
whisper = [ "faster-whisper~=1.0.2" ]
|
||||
|
||||
[tool.setuptools.packages.find]
|
||||
|
||||
@@ -71,6 +71,8 @@ class PipelineTask:
|
||||
await self._source.process_frame(CancelFrame(), FrameDirection.DOWNSTREAM)
|
||||
self._process_down_task.cancel()
|
||||
self._process_up_task.cancel()
|
||||
await self._process_down_task
|
||||
await self._process_up_task
|
||||
|
||||
async def run(self):
|
||||
self._process_up_task = asyncio.create_task(self._process_up_queue())
|
||||
@@ -122,6 +124,7 @@ class PipelineTask:
|
||||
await self._pipeline.cleanup()
|
||||
# We just enqueue None to terminate the task gracefully.
|
||||
self._process_up_task.cancel()
|
||||
await self._process_up_task
|
||||
|
||||
async def _process_up_queue(self):
|
||||
while True:
|
||||
|
||||
@@ -12,9 +12,9 @@ from pipecat.frames.frames import Frame
|
||||
class FrameSerializer(ABC):
|
||||
|
||||
@abstractmethod
|
||||
def serialize(self, frame: Frame) -> bytes:
|
||||
def serialize(self, frame: Frame) -> str | bytes | None:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def deserialize(self, data: bytes) -> Frame | None:
|
||||
def deserialize(self, data: str | bytes) -> Frame | None:
|
||||
pass
|
||||
|
||||
@@ -26,7 +26,7 @@ class ProtobufFrameSerializer(FrameSerializer):
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def serialize(self, frame: Frame) -> bytes:
|
||||
def serialize(self, frame: Frame) -> str | bytes | None:
|
||||
proto_frame = frame_protos.Frame()
|
||||
if type(frame) not in self.SERIALIZABLE_TYPES:
|
||||
raise ValueError(
|
||||
@@ -41,7 +41,7 @@ class ProtobufFrameSerializer(FrameSerializer):
|
||||
result = proto_frame.SerializeToString()
|
||||
return result
|
||||
|
||||
def deserialize(self, data: bytes) -> Frame | None:
|
||||
def deserialize(self, data: str | bytes) -> Frame | None:
|
||||
"""Returns a Frame object from a Frame protobuf. Used to convert frames
|
||||
passed over the wire as protobufs to Frame objects used in pipelines
|
||||
and frame processors.
|
||||
|
||||
55
src/pipecat/serializers/twilio.py
Normal file
55
src/pipecat/serializers/twilio.py
Normal file
@@ -0,0 +1,55 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import base64
|
||||
import json
|
||||
|
||||
from pipecat.frames.frames import AudioRawFrame, Frame
|
||||
from pipecat.serializers.base_serializer import FrameSerializer
|
||||
from pipecat.utils.audio import ulaw_8000_to_pcm_16000, pcm_16000_to_ulaw_8000
|
||||
|
||||
|
||||
class TwilioFrameSerializer(FrameSerializer):
|
||||
SERIALIZABLE_TYPES = {
|
||||
AudioRawFrame: "audio",
|
||||
}
|
||||
|
||||
def __init__(self):
|
||||
self._sid = None
|
||||
|
||||
def serialize(self, frame: Frame) -> str | bytes | None:
|
||||
if not isinstance(frame, AudioRawFrame):
|
||||
return None
|
||||
|
||||
data = frame.audio
|
||||
|
||||
serialized_data = pcm_16000_to_ulaw_8000(data)
|
||||
payload = base64.b64encode(serialized_data).decode("utf-8")
|
||||
answer = {
|
||||
"event": "media",
|
||||
"streamSid": self._sid,
|
||||
"media": {
|
||||
"payload": payload
|
||||
}
|
||||
}
|
||||
|
||||
return json.dumps(answer)
|
||||
|
||||
def deserialize(self, data: str | bytes) -> Frame | None:
|
||||
message = json.loads(data)
|
||||
|
||||
if not self._sid:
|
||||
self._sid = message["streamSid"] if "streamSid" in message else None
|
||||
|
||||
if message["event"] != "media":
|
||||
return None
|
||||
else:
|
||||
payload_base64 = message["media"]["payload"]
|
||||
payload = base64.b64decode(payload_base64)
|
||||
|
||||
deserialized_data = ulaw_8000_to_pcm_16000(payload)
|
||||
audio_frame = AudioRawFrame(audio=deserialized_data, num_channels=1, sample_rate=16000)
|
||||
return audio_frame
|
||||
@@ -21,6 +21,7 @@ from pipecat.frames.frames import (
|
||||
TTSStoppedFrame,
|
||||
TextFrame,
|
||||
VisionImageRawFrame,
|
||||
LLMFullResponseEndFrame,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.utils.audio import calculate_audio_volume
|
||||
@@ -110,7 +111,9 @@ class TTSService(AIService):
|
||||
text = frame.text
|
||||
else:
|
||||
self._current_sentence += frame.text
|
||||
if self._current_sentence.strip().endswith((".", "?", "!")):
|
||||
if self._current_sentence.strip().endswith(
|
||||
(".", "?", "!")) and not self._current_sentence.strip().endswith(
|
||||
("Mr,", "Mrs.", "Ms.", "Dr.")):
|
||||
text = self._current_sentence.strip()
|
||||
self._current_sentence = ""
|
||||
|
||||
@@ -134,6 +137,11 @@ class TTSService(AIService):
|
||||
if self._current_sentence:
|
||||
await self._push_tts_frames(self._current_sentence)
|
||||
await self.push_frame(frame)
|
||||
elif isinstance(frame, LLMFullResponseEndFrame):
|
||||
if self._current_sentence:
|
||||
await self._push_tts_frames(self._current_sentence.strip())
|
||||
self._current_sentence = ""
|
||||
await self.push_frame(frame)
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
|
||||
@@ -146,10 +146,11 @@ class AzureSTTService(AIService):
|
||||
async def cancel(self, frame: CancelFrame):
|
||||
self._speech_recognizer.stop_continuous_recognition_async()
|
||||
self._push_frame_task.cancel()
|
||||
await self._push_frame_task
|
||||
|
||||
def _create_push_task(self):
|
||||
self._push_frame_task = self.get_event_loop().create_task(self._push_frame_task_handler())
|
||||
self._push_queue = asyncio.Queue()
|
||||
self._push_frame_task = self.get_event_loop().create_task(self._push_frame_task_handler())
|
||||
|
||||
async def _push_frame_task_handler(self):
|
||||
running = True
|
||||
|
||||
@@ -89,6 +89,7 @@ class DeepgramTTSService(TTSService):
|
||||
class DeepgramSTTService(AIService):
|
||||
def __init__(self,
|
||||
api_key: str,
|
||||
url: str = "",
|
||||
live_options: LiveOptions = LiveOptions(
|
||||
encoding="linear16",
|
||||
language="en-US",
|
||||
@@ -104,7 +105,7 @@ class DeepgramSTTService(AIService):
|
||||
self._live_options = live_options
|
||||
|
||||
self._client = DeepgramClient(
|
||||
api_key, config=DeepgramClientOptions(options={"keepalive": "true"}))
|
||||
api_key, config=DeepgramClientOptions(url=url, options={"keepalive": "true"}))
|
||||
self._connection = self._client.listen.asynclive.v("1")
|
||||
self._connection.on(LiveTranscriptionEvents.Transcript, self._on_message)
|
||||
|
||||
@@ -134,10 +135,11 @@ class DeepgramSTTService(AIService):
|
||||
async def cancel(self, frame: CancelFrame):
|
||||
await self._connection.finish()
|
||||
self._push_frame_task.cancel()
|
||||
await self._push_frame_task
|
||||
|
||||
def _create_push_task(self):
|
||||
self._push_frame_task = self.get_event_loop().create_task(self._push_frame_task_handler())
|
||||
self._push_queue = asyncio.Queue()
|
||||
self._push_frame_task = self.get_event_loop().create_task(self._push_frame_task_handler())
|
||||
|
||||
async def _push_frame_task_handler(self):
|
||||
running = True
|
||||
|
||||
@@ -5,7 +5,6 @@
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import queue
|
||||
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
@@ -33,8 +32,6 @@ class BaseInputTransport(FrameProcessor):
|
||||
|
||||
self._params = params
|
||||
|
||||
self._running = False
|
||||
|
||||
self._executor = ThreadPoolExecutor(max_workers=5)
|
||||
|
||||
# Create push frame task. This is the task that will push frames in
|
||||
@@ -42,34 +39,21 @@ class BaseInputTransport(FrameProcessor):
|
||||
self._create_push_task()
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
if self._running:
|
||||
return
|
||||
|
||||
self._running = True
|
||||
|
||||
# Create audio input queue and thread if needed.
|
||||
# Create audio input queue and task if needed.
|
||||
if self._params.audio_in_enabled or self._params.vad_enabled:
|
||||
self._audio_in_queue = queue.Queue()
|
||||
self._audio_thread = self._loop.run_in_executor(
|
||||
self._executor, self._audio_thread_handler)
|
||||
self._audio_in_queue = asyncio.Queue()
|
||||
self._audio_task = self.get_event_loop().create_task(self._audio_task_handler())
|
||||
|
||||
async def stop(self):
|
||||
if not self._running:
|
||||
return
|
||||
|
||||
# This will exit all threads.
|
||||
self._running = False
|
||||
|
||||
# Wait for the threads to finish.
|
||||
# Wait for the task to finish.
|
||||
if self._params.audio_in_enabled or self._params.vad_enabled:
|
||||
await self._audio_thread
|
||||
|
||||
self._push_frame_task.cancel()
|
||||
self._audio_task.cancel()
|
||||
await self._audio_task
|
||||
|
||||
def vad_analyzer(self) -> VADAnalyzer | None:
|
||||
return self._params.vad_analyzer
|
||||
|
||||
def push_audio_frame(self, frame: AudioRawFrame):
|
||||
async def push_audio_frame(self, frame: AudioRawFrame):
|
||||
if self._params.audio_in_enabled or self._params.vad_enabled:
|
||||
self._audio_in_queue.put_nowait(frame)
|
||||
|
||||
@@ -78,7 +62,8 @@ class BaseInputTransport(FrameProcessor):
|
||||
#
|
||||
|
||||
async def cleanup(self):
|
||||
pass
|
||||
self._push_frame_task.cancel()
|
||||
await self._push_frame_task
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
@@ -102,8 +87,8 @@ class BaseInputTransport(FrameProcessor):
|
||||
|
||||
def _create_push_task(self):
|
||||
loop = self.get_event_loop()
|
||||
self._push_frame_task = loop.create_task(self._push_frame_task_handler())
|
||||
self._push_queue = asyncio.Queue()
|
||||
self._push_frame_task = loop.create_task(self._push_frame_task_handler())
|
||||
|
||||
async def _internal_push_frame(
|
||||
self,
|
||||
@@ -129,6 +114,7 @@ class BaseInputTransport(FrameProcessor):
|
||||
if isinstance(frame, UserStartedSpeakingFrame):
|
||||
logger.debug("User started speaking")
|
||||
self._push_frame_task.cancel()
|
||||
await self._push_frame_task
|
||||
self._create_push_task()
|
||||
await self.push_frame(StartInterruptionFrame())
|
||||
elif isinstance(frame, UserStoppedSpeakingFrame):
|
||||
@@ -140,15 +126,16 @@ class BaseInputTransport(FrameProcessor):
|
||||
# Audio input
|
||||
#
|
||||
|
||||
def _vad_analyze(self, audio_frames: bytes) -> VADState:
|
||||
async def _vad_analyze(self, audio_frames: bytes) -> VADState:
|
||||
state = VADState.QUIET
|
||||
vad_analyzer = self.vad_analyzer()
|
||||
if vad_analyzer:
|
||||
state = vad_analyzer.analyze_audio(audio_frames)
|
||||
state = await self.get_event_loop().run_in_executor(
|
||||
self._executor, vad_analyzer.analyze_audio, audio_frames)
|
||||
return state
|
||||
|
||||
def _handle_vad(self, audio_frames: bytes, vad_state: VADState):
|
||||
new_vad_state = self._vad_analyze(audio_frames)
|
||||
async def _handle_vad(self, audio_frames: bytes, vad_state: VADState):
|
||||
new_vad_state = await self._vad_analyze(audio_frames)
|
||||
if new_vad_state != vad_state and new_vad_state != VADState.STARTING and new_vad_state != VADState.STOPPING:
|
||||
frame = None
|
||||
if new_vad_state == VADState.SPEAKING:
|
||||
@@ -157,33 +144,29 @@ class BaseInputTransport(FrameProcessor):
|
||||
frame = UserStoppedSpeakingFrame()
|
||||
|
||||
if frame:
|
||||
future = asyncio.run_coroutine_threadsafe(
|
||||
self._handle_interruptions(frame), self.get_event_loop())
|
||||
future.result()
|
||||
await self._handle_interruptions(frame)
|
||||
|
||||
vad_state = new_vad_state
|
||||
return vad_state
|
||||
|
||||
def _audio_thread_handler(self):
|
||||
async def _audio_task_handler(self):
|
||||
vad_state: VADState = VADState.QUIET
|
||||
while self._running:
|
||||
while True:
|
||||
try:
|
||||
frame: AudioRawFrame = self._audio_in_queue.get(timeout=1)
|
||||
frame: AudioRawFrame = await self._audio_in_queue.get()
|
||||
|
||||
audio_passthrough = True
|
||||
|
||||
# Check VAD and push event if necessary. We just care about
|
||||
# changes from QUIET to SPEAKING and vice versa.
|
||||
if self._params.vad_enabled:
|
||||
vad_state = self._handle_vad(frame.audio, vad_state)
|
||||
vad_state = await self._handle_vad(frame.audio, vad_state)
|
||||
audio_passthrough = self._params.vad_audio_passthrough
|
||||
|
||||
# Push audio downstream if passthrough.
|
||||
if audio_passthrough:
|
||||
future = asyncio.run_coroutine_threadsafe(
|
||||
self._internal_push_frame(frame), self._loop)
|
||||
future.result()
|
||||
except queue.Empty:
|
||||
pass
|
||||
await self._internal_push_frame(frame)
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except BaseException as e:
|
||||
logger.error(f"{self} error reading audio frames: {e}")
|
||||
|
||||
@@ -7,11 +7,6 @@
|
||||
|
||||
import asyncio
|
||||
import itertools
|
||||
import queue
|
||||
import time
|
||||
import threading
|
||||
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
from PIL import Image
|
||||
from typing import List
|
||||
@@ -42,67 +37,51 @@ class BaseOutputTransport(FrameProcessor):
|
||||
|
||||
self._params = params
|
||||
|
||||
self._running = False
|
||||
|
||||
self._executor = ThreadPoolExecutor(max_workers=5)
|
||||
|
||||
# These are the images that we should send to the camera at our desired
|
||||
# framerate.
|
||||
self._camera_images = None
|
||||
|
||||
# Create media threads queues.
|
||||
if self._params.camera_out_enabled:
|
||||
self._camera_out_queue = queue.Queue()
|
||||
self._sink_queue = queue.Queue()
|
||||
self._sink_thread = None
|
||||
|
||||
self._stopped_event = asyncio.Event()
|
||||
self._is_interrupted = threading.Event()
|
||||
|
||||
# We will write 20ms audio at a time. If we receive long audio frames we
|
||||
# will chunk them. This will help with interruption handling.
|
||||
audio_bytes_10ms = int(self._params.audio_out_sample_rate / 100) * \
|
||||
self._params.audio_out_channels * 2
|
||||
self._audio_chunk_size = audio_bytes_10ms * 2
|
||||
|
||||
self._stopped_event = asyncio.Event()
|
||||
|
||||
# Create sink frame task. This is the task that will actually write
|
||||
# audio or video frames. We write audio/video in a task so we can keep
|
||||
# generating frames upstream while, for example, the audio is playing.
|
||||
self._create_sink_task()
|
||||
|
||||
# Create push frame task. This is the task that will push frames in
|
||||
# order. We also guarantee that all frames are pushed in the same task.
|
||||
self._create_push_task()
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
if self._running:
|
||||
return
|
||||
|
||||
self._running = True
|
||||
|
||||
loop = self.get_event_loop()
|
||||
|
||||
# Create queues and threads.
|
||||
# Create media threads queues.
|
||||
if self._params.camera_out_enabled:
|
||||
self._camera_out_thread = loop.run_in_executor(
|
||||
self._executor, self._camera_out_thread_handler)
|
||||
|
||||
self._sink_thread = loop.run_in_executor(self._executor, self._sink_thread_handler)
|
||||
self._camera_out_queue = asyncio.Queue()
|
||||
self._camera_out_task = self.get_event_loop().create_task(self._camera_out_task_handler())
|
||||
|
||||
async def stop(self):
|
||||
if not self._running:
|
||||
return
|
||||
|
||||
# This will exit all threads.
|
||||
self._running = False
|
||||
# Wait on the threads to finish.
|
||||
if self._params.camera_out_enabled:
|
||||
self._camera_out_task.cancel()
|
||||
await self._camera_out_task
|
||||
|
||||
self._stopped_event.set()
|
||||
|
||||
def send_message(self, frame: TransportMessageFrame):
|
||||
async def send_message(self, frame: TransportMessageFrame):
|
||||
pass
|
||||
|
||||
def send_metrics(self, frame: MetricsFrame):
|
||||
async def send_metrics(self, frame: MetricsFrame):
|
||||
pass
|
||||
|
||||
def write_frame_to_camera(self, frame: ImageRawFrame):
|
||||
async def write_frame_to_camera(self, frame: ImageRawFrame):
|
||||
pass
|
||||
|
||||
def write_raw_audio_frames(self, frames: bytes):
|
||||
async def write_raw_audio_frames(self, frames: bytes):
|
||||
pass
|
||||
|
||||
#
|
||||
@@ -110,12 +89,12 @@ class BaseOutputTransport(FrameProcessor):
|
||||
#
|
||||
|
||||
async def cleanup(self):
|
||||
# Wait on the threads to finish.
|
||||
if self._params.camera_out_enabled:
|
||||
await self._camera_out_thread
|
||||
if self._sink_task:
|
||||
self._sink_task.cancel()
|
||||
await self._sink_task
|
||||
|
||||
if self._sink_thread:
|
||||
await self._sink_thread
|
||||
self._push_frame_task.cancel()
|
||||
await self._push_frame_task
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
@@ -128,7 +107,7 @@ class BaseOutputTransport(FrameProcessor):
|
||||
if isinstance(frame, StartFrame):
|
||||
await self.start(frame)
|
||||
await self.push_frame(frame, direction)
|
||||
# EndFrame is managed in the queue handler.
|
||||
# EndFrame is managed in the sink queue handler.
|
||||
elif isinstance(frame, CancelFrame):
|
||||
await self.stop()
|
||||
await self.push_frame(frame, direction)
|
||||
@@ -136,14 +115,14 @@ class BaseOutputTransport(FrameProcessor):
|
||||
await self._handle_interruptions(frame)
|
||||
await self.push_frame(frame, direction)
|
||||
elif isinstance(frame, MetricsFrame):
|
||||
self.send_metrics(frame)
|
||||
await self.send_metrics(frame)
|
||||
await self.push_frame(frame, direction)
|
||||
elif isinstance(frame, SystemFrame):
|
||||
await self.push_frame(frame, direction)
|
||||
elif isinstance(frame, AudioRawFrame):
|
||||
await self._handle_audio(frame)
|
||||
else:
|
||||
self._sink_queue.put_nowait(frame)
|
||||
await self._sink_queue.put(frame)
|
||||
|
||||
# If we are finishing, wait here until we have stopped, otherwise we might
|
||||
# close things too early upstream. We need this event because we don't
|
||||
@@ -156,50 +135,51 @@ class BaseOutputTransport(FrameProcessor):
|
||||
return
|
||||
|
||||
if isinstance(frame, StartInterruptionFrame):
|
||||
self._is_interrupted.set()
|
||||
# Stop sink task.
|
||||
self._sink_task.cancel()
|
||||
await self._sink_task
|
||||
self._create_sink_task()
|
||||
# Stop push task.
|
||||
self._push_frame_task.cancel()
|
||||
await self._push_frame_task
|
||||
self._create_push_task()
|
||||
elif isinstance(frame, StopInterruptionFrame):
|
||||
self._is_interrupted.clear()
|
||||
|
||||
async def _handle_audio(self, frame: AudioRawFrame):
|
||||
audio = frame.audio
|
||||
for i in range(0, len(audio), self._audio_chunk_size):
|
||||
chunk = AudioRawFrame(audio[i: i + self._audio_chunk_size],
|
||||
sample_rate=frame.sample_rate, num_channels=frame.num_channels)
|
||||
self._sink_queue.put_nowait(chunk)
|
||||
await self._sink_queue.put(chunk)
|
||||
|
||||
def _sink_thread_handler(self):
|
||||
def _create_sink_task(self):
|
||||
loop = self.get_event_loop()
|
||||
self._sink_queue = asyncio.Queue()
|
||||
self._sink_task = loop.create_task(self._sink_task_handler())
|
||||
|
||||
async def _sink_task_handler(self):
|
||||
# Audio accumlation buffer
|
||||
buffer = bytearray()
|
||||
while self._running:
|
||||
while True:
|
||||
try:
|
||||
frame = self._sink_queue.get(timeout=1)
|
||||
if not self._is_interrupted.is_set():
|
||||
if isinstance(frame, AudioRawFrame) and self._params.audio_out_enabled:
|
||||
buffer.extend(frame.audio)
|
||||
buffer = self._maybe_send_audio(buffer)
|
||||
elif isinstance(frame, ImageRawFrame) and self._params.camera_out_enabled:
|
||||
self._set_camera_image(frame)
|
||||
elif isinstance(frame, SpriteFrame) and self._params.camera_out_enabled:
|
||||
self._set_camera_images(frame.images)
|
||||
elif isinstance(frame, TransportMessageFrame):
|
||||
self.send_message(frame)
|
||||
else:
|
||||
future = asyncio.run_coroutine_threadsafe(
|
||||
self._internal_push_frame(frame), self.get_event_loop())
|
||||
future.result()
|
||||
frame = await self._sink_queue.get()
|
||||
if isinstance(frame, AudioRawFrame) and self._params.audio_out_enabled:
|
||||
buffer.extend(frame.audio)
|
||||
buffer = await self._maybe_send_audio(buffer)
|
||||
elif isinstance(frame, ImageRawFrame) and self._params.camera_out_enabled:
|
||||
await self._set_camera_image(frame)
|
||||
elif isinstance(frame, SpriteFrame) and self._params.camera_out_enabled:
|
||||
await self._set_camera_images(frame.images)
|
||||
elif isinstance(frame, TransportMessageFrame):
|
||||
await self.send_message(frame)
|
||||
else:
|
||||
# If we get interrupted just clear the output buffer.
|
||||
buffer = bytearray()
|
||||
await self._internal_push_frame(frame)
|
||||
|
||||
if isinstance(frame, EndFrame):
|
||||
future = asyncio.run_coroutine_threadsafe(self.stop(), self.get_event_loop())
|
||||
future.result()
|
||||
await self.stop()
|
||||
|
||||
self._sink_queue.task_done()
|
||||
except queue.Empty:
|
||||
pass
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except BaseException as e:
|
||||
logger.error(f"{self} error processing sink queue: {e}")
|
||||
|
||||
@@ -209,8 +189,8 @@ class BaseOutputTransport(FrameProcessor):
|
||||
|
||||
def _create_push_task(self):
|
||||
loop = self.get_event_loop()
|
||||
self._push_frame_task = loop.create_task(self._push_frame_task_handler())
|
||||
self._push_queue = asyncio.Queue()
|
||||
self._push_frame_task = loop.create_task(self._push_frame_task_handler())
|
||||
|
||||
async def _internal_push_frame(
|
||||
self,
|
||||
@@ -233,7 +213,7 @@ class BaseOutputTransport(FrameProcessor):
|
||||
async def send_image(self, frame: ImageRawFrame | SpriteFrame):
|
||||
await self.process_frame(frame, FrameDirection.DOWNSTREAM)
|
||||
|
||||
def _draw_image(self, frame: ImageRawFrame):
|
||||
async def _draw_image(self, frame: ImageRawFrame):
|
||||
desired_size = (self._params.camera_out_width, self._params.camera_out_height)
|
||||
|
||||
if frame.size != desired_size:
|
||||
@@ -243,32 +223,32 @@ class BaseOutputTransport(FrameProcessor):
|
||||
f"{frame} does not have the expected size {desired_size}, resizing")
|
||||
frame = ImageRawFrame(resized_image.tobytes(), resized_image.size, resized_image.format)
|
||||
|
||||
self.write_frame_to_camera(frame)
|
||||
await self.write_frame_to_camera(frame)
|
||||
|
||||
def _set_camera_image(self, image: ImageRawFrame):
|
||||
async def _set_camera_image(self, image: ImageRawFrame):
|
||||
if self._params.camera_out_is_live:
|
||||
self._camera_out_queue.put_nowait(image)
|
||||
await self._camera_out_queue.put(image)
|
||||
else:
|
||||
self._camera_images = itertools.cycle([image])
|
||||
|
||||
def _set_camera_images(self, images: List[ImageRawFrame]):
|
||||
async def _set_camera_images(self, images: List[ImageRawFrame]):
|
||||
self._camera_images = itertools.cycle(images)
|
||||
|
||||
def _camera_out_thread_handler(self):
|
||||
while self._running:
|
||||
async def _camera_out_task_handler(self):
|
||||
while True:
|
||||
try:
|
||||
if self._params.camera_out_is_live:
|
||||
image = self._camera_out_queue.get(timeout=1)
|
||||
self._draw_image(image)
|
||||
image = await self._camera_out_queue.get()
|
||||
await self._draw_image(image)
|
||||
self._camera_out_queue.task_done()
|
||||
elif self._camera_images:
|
||||
image = next(self._camera_images)
|
||||
self._draw_image(image)
|
||||
time.sleep(1.0 / self._params.camera_out_framerate)
|
||||
await self._draw_image(image)
|
||||
await asyncio.sleep(1.0 / self._params.camera_out_framerate)
|
||||
else:
|
||||
time.sleep(1.0 / self._params.camera_out_framerate)
|
||||
except queue.Empty:
|
||||
pass
|
||||
await asyncio.sleep(1.0 / self._params.camera_out_framerate)
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"{self} error writing to camera: {e}")
|
||||
|
||||
@@ -279,12 +259,8 @@ class BaseOutputTransport(FrameProcessor):
|
||||
async def send_audio(self, frame: AudioRawFrame):
|
||||
await self.process_frame(frame, FrameDirection.DOWNSTREAM)
|
||||
|
||||
def _maybe_send_audio(self, buffer: bytearray) -> bytearray:
|
||||
try:
|
||||
if len(buffer) >= self._audio_chunk_size:
|
||||
self.write_raw_audio_frames(bytes(buffer[:self._audio_chunk_size]))
|
||||
buffer = buffer[self._audio_chunk_size:]
|
||||
return buffer
|
||||
except BaseException as e:
|
||||
logger.error(f"{self} error writing audio frames: {e}")
|
||||
return buffer
|
||||
async def _maybe_send_audio(self, buffer: bytearray) -> bytearray:
|
||||
if len(buffer) >= self._audio_chunk_size:
|
||||
await self.write_raw_audio_frames(bytes(buffer[:self._audio_chunk_size]))
|
||||
buffer = buffer[self._audio_chunk_size:]
|
||||
return buffer
|
||||
|
||||
@@ -6,6 +6,8 @@
|
||||
|
||||
import asyncio
|
||||
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
from pipecat.frames.frames import AudioRawFrame, StartFrame
|
||||
from pipecat.processors.frame_processor import FrameProcessor
|
||||
from pipecat.transports.base_input import BaseInputTransport
|
||||
@@ -43,26 +45,20 @@ class LocalAudioInputTransport(BaseInputTransport):
|
||||
await super().start(frame)
|
||||
self._in_stream.start_stream()
|
||||
|
||||
async def stop(self):
|
||||
await super().stop()
|
||||
self._in_stream.stop_stream()
|
||||
|
||||
async def cleanup(self):
|
||||
await super().cleanup()
|
||||
self._in_stream.stop_stream()
|
||||
# This is not very pretty (taken from PyAudio docs).
|
||||
while self._in_stream.is_active():
|
||||
await asyncio.sleep(0.1)
|
||||
self._in_stream.close()
|
||||
|
||||
await super().cleanup()
|
||||
|
||||
def _audio_in_callback(self, in_data, frame_count, time_info, status):
|
||||
if not self._running:
|
||||
return (None, pyaudio.paAbort)
|
||||
|
||||
frame = AudioRawFrame(audio=in_data,
|
||||
sample_rate=self._params.audio_in_sample_rate,
|
||||
num_channels=self._params.audio_in_channels)
|
||||
self.push_audio_frame(frame)
|
||||
|
||||
asyncio.run_coroutine_threadsafe(self.push_audio_frame(frame), self.get_event_loop())
|
||||
|
||||
return (None, pyaudio.paContinue)
|
||||
|
||||
@@ -72,19 +68,29 @@ class LocalAudioOutputTransport(BaseOutputTransport):
|
||||
def __init__(self, py_audio: pyaudio.PyAudio, params: TransportParams):
|
||||
super().__init__(params)
|
||||
|
||||
self._executor = ThreadPoolExecutor(max_workers=5)
|
||||
|
||||
self._out_stream = py_audio.open(
|
||||
format=py_audio.get_format_from_width(2),
|
||||
channels=params.audio_out_channels,
|
||||
rate=params.audio_out_sample_rate,
|
||||
output=True)
|
||||
|
||||
def write_raw_audio_frames(self, frames: bytes):
|
||||
self._out_stream.write(frames)
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
self._out_stream.start_stream()
|
||||
|
||||
async def cleanup(self):
|
||||
await super().cleanup()
|
||||
self._out_stream.stop_stream()
|
||||
# This is not very pretty (taken from PyAudio docs).
|
||||
while self._out_stream.is_active():
|
||||
await asyncio.sleep(0.1)
|
||||
self._out_stream.close()
|
||||
|
||||
async def write_raw_audio_frames(self, frames: bytes):
|
||||
await self.get_event_loop().run_in_executor(self._executor, self._out_stream.write, frames)
|
||||
|
||||
|
||||
class LocalAudioTransport(BaseTransport):
|
||||
|
||||
|
||||
@@ -6,6 +6,8 @@
|
||||
|
||||
import asyncio
|
||||
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
import numpy as np
|
||||
import tkinter as tk
|
||||
|
||||
@@ -53,25 +55,20 @@ class TkInputTransport(BaseInputTransport):
|
||||
await super().start(frame)
|
||||
self._in_stream.start_stream()
|
||||
|
||||
async def stop(self):
|
||||
await super().stop()
|
||||
self._in_stream.stop_stream()
|
||||
|
||||
async def cleanup(self):
|
||||
await super().cleanup()
|
||||
self._in_stream.stop_stream()
|
||||
# This is not very pretty (taken from PyAudio docs).
|
||||
while self._in_stream.is_active():
|
||||
await asyncio.sleep(0.1)
|
||||
self._in_stream.close()
|
||||
|
||||
def _audio_in_callback(self, in_data, frame_count, time_info, status):
|
||||
if not self._running:
|
||||
return (None, pyaudio.paAbort)
|
||||
|
||||
frame = AudioRawFrame(audio=in_data,
|
||||
sample_rate=self._params.audio_in_sample_rate,
|
||||
num_channels=self._params.audio_in_channels)
|
||||
self.push_audio_frame(frame)
|
||||
|
||||
asyncio.run_coroutine_threadsafe(self.push_audio_frame(frame), self.get_event_loop())
|
||||
|
||||
return (None, pyaudio.paContinue)
|
||||
|
||||
@@ -81,6 +78,8 @@ class TkOutputTransport(BaseOutputTransport):
|
||||
def __init__(self, tk_root: tk.Tk, py_audio: pyaudio.PyAudio, params: TransportParams):
|
||||
super().__init__(params)
|
||||
|
||||
self._executor = ThreadPoolExecutor(max_workers=5)
|
||||
|
||||
self._out_stream = py_audio.open(
|
||||
format=py_audio.get_format_from_width(2),
|
||||
channels=params.audio_out_channels,
|
||||
@@ -94,16 +93,24 @@ class TkOutputTransport(BaseOutputTransport):
|
||||
self._image_label = tk.Label(tk_root, image=photo)
|
||||
self._image_label.pack()
|
||||
|
||||
def write_raw_audio_frames(self, frames: bytes):
|
||||
self._out_stream.write(frames)
|
||||
|
||||
def write_frame_to_camera(self, frame: ImageRawFrame):
|
||||
self.get_event_loop().call_soon(self._write_frame_to_tk, frame)
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
self._out_stream.start_stream()
|
||||
|
||||
async def cleanup(self):
|
||||
await super().cleanup()
|
||||
self._out_stream.stop_stream()
|
||||
# This is not very pretty (taken from PyAudio docs).
|
||||
while self._out_stream.is_active():
|
||||
await asyncio.sleep(0.1)
|
||||
self._out_stream.close()
|
||||
|
||||
async def write_raw_audio_frames(self, frames: bytes):
|
||||
await self.get_event_loop().run_in_executor(self._executor, self._out_stream.write, frames)
|
||||
|
||||
async def write_frame_to_camera(self, frame: ImageRawFrame):
|
||||
self.get_event_loop().call_soon(self._write_frame_to_tk, frame)
|
||||
|
||||
def _write_frame_to_tk(self, frame: ImageRawFrame):
|
||||
width = frame.size[0]
|
||||
height = frame.size[1]
|
||||
|
||||
160
src/pipecat/transports/network/fastapi_websocket.py
Normal file
160
src/pipecat/transports/network/fastapi_websocket.py
Normal file
@@ -0,0 +1,160 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
|
||||
import asyncio
|
||||
import io
|
||||
import wave
|
||||
|
||||
from typing import Awaitable, Callable
|
||||
from pydantic.main import BaseModel
|
||||
|
||||
from pipecat.serializers.twilio import TwilioFrameSerializer
|
||||
from pipecat.frames.frames import AudioRawFrame, StartFrame
|
||||
from pipecat.processors.frame_processor import FrameProcessor
|
||||
from pipecat.serializers.base_serializer import FrameSerializer
|
||||
from pipecat.transports.base_input import BaseInputTransport
|
||||
from pipecat.transports.base_output import BaseOutputTransport
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
|
||||
from loguru import logger
|
||||
|
||||
try:
|
||||
from fastapi import WebSocket
|
||||
from starlette.websockets import WebSocketState
|
||||
except ModuleNotFoundError as e:
|
||||
logger.error(f"Exception: {e}")
|
||||
logger.error(
|
||||
"In order to use FastAPI websockets, you need to `pip install pipecat-ai[websocket]`.")
|
||||
raise Exception(f"Missing module: {e}")
|
||||
|
||||
|
||||
class FastAPIWebsocketParams(TransportParams):
|
||||
add_wav_header: bool = False
|
||||
audio_frame_size: int = 6400 # 200ms
|
||||
serializer: FrameSerializer = TwilioFrameSerializer()
|
||||
|
||||
|
||||
class FastAPIWebsocketCallbacks(BaseModel):
|
||||
on_client_connected: Callable[[WebSocket], Awaitable[None]]
|
||||
on_client_disconnected: Callable[[WebSocket], Awaitable[None]]
|
||||
|
||||
|
||||
class FastAPIWebsocketInputTransport(BaseInputTransport):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
websocket: WebSocket,
|
||||
params: FastAPIWebsocketParams,
|
||||
callbacks: FastAPIWebsocketCallbacks,
|
||||
**kwargs):
|
||||
super().__init__(params, **kwargs)
|
||||
|
||||
self._websocket = websocket
|
||||
self._params = params
|
||||
self._callbacks = callbacks
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await self._callbacks.on_client_connected(self._websocket)
|
||||
await super().start(frame)
|
||||
self._receive_task = self.get_event_loop().create_task(self._receive_messages())
|
||||
|
||||
async def stop(self):
|
||||
if self._websocket.client_state != WebSocketState.DISCONNECTED:
|
||||
await self._websocket.close()
|
||||
await super().stop()
|
||||
|
||||
async def _receive_messages(self):
|
||||
async for message in self._websocket.iter_text():
|
||||
frame = self._params.serializer.deserialize(message)
|
||||
|
||||
if not frame:
|
||||
continue
|
||||
|
||||
if isinstance(frame, AudioRawFrame):
|
||||
await self.push_audio_frame(frame)
|
||||
|
||||
await self._callbacks.on_client_disconnected(self._websocket)
|
||||
|
||||
|
||||
class FastAPIWebsocketOutputTransport(BaseOutputTransport):
|
||||
|
||||
def __init__(self, websocket: WebSocket, params: FastAPIWebsocketParams, **kwargs):
|
||||
super().__init__(params, **kwargs)
|
||||
|
||||
self._websocket = websocket
|
||||
self._params = params
|
||||
self._audio_buffer = bytes()
|
||||
|
||||
async def write_raw_audio_frames(self, frames: bytes):
|
||||
self._audio_buffer += frames
|
||||
while len(self._audio_buffer) >= self._params.audio_frame_size:
|
||||
frame = AudioRawFrame(
|
||||
audio=self._audio_buffer[:self._params.audio_frame_size],
|
||||
sample_rate=self._params.audio_out_sample_rate,
|
||||
num_channels=self._params.audio_out_channels
|
||||
)
|
||||
|
||||
if self._params.add_wav_header:
|
||||
content = io.BytesIO()
|
||||
ww = wave.open(content, "wb")
|
||||
ww.setsampwidth(2)
|
||||
ww.setnchannels(frame.num_channels)
|
||||
ww.setframerate(frame.sample_rate)
|
||||
ww.writeframes(frame.audio)
|
||||
ww.close()
|
||||
content.seek(0)
|
||||
wav_frame = AudioRawFrame(
|
||||
content.read(),
|
||||
sample_rate=frame.sample_rate,
|
||||
num_channels=frame.num_channels)
|
||||
frame = wav_frame
|
||||
|
||||
payload = self._params.serializer.serialize(frame)
|
||||
if payload:
|
||||
await self._websocket.send_text(payload)
|
||||
|
||||
self._audio_buffer = self._audio_buffer[self._params.audio_frame_size:]
|
||||
|
||||
|
||||
class FastAPIWebsocketTransport(BaseTransport):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
websocket: WebSocket,
|
||||
params: FastAPIWebsocketParams = FastAPIWebsocketParams(),
|
||||
input_name: str | None = None,
|
||||
output_name: str | None = None,
|
||||
loop: asyncio.AbstractEventLoop | None = None):
|
||||
super().__init__(input_name=input_name, output_name=output_name, loop=loop)
|
||||
self._params = params
|
||||
|
||||
self._callbacks = FastAPIWebsocketCallbacks(
|
||||
on_client_connected=self._on_client_connected,
|
||||
on_client_disconnected=self._on_client_disconnected
|
||||
)
|
||||
|
||||
self._input = FastAPIWebsocketInputTransport(
|
||||
websocket, self._params, self._callbacks, name=self._input_name)
|
||||
self._output = FastAPIWebsocketOutputTransport(
|
||||
websocket, self._params, name=self._output_name)
|
||||
|
||||
# Register supported handlers. The user will only be able to register
|
||||
# these handlers.
|
||||
self._register_event_handler("on_client_connected")
|
||||
self._register_event_handler("on_client_disconnected")
|
||||
|
||||
def input(self) -> FrameProcessor:
|
||||
return self._input
|
||||
|
||||
def output(self) -> FrameProcessor:
|
||||
return self._output
|
||||
|
||||
async def _on_client_connected(self, websocket):
|
||||
await self._call_event_handler("on_client_connected", websocket)
|
||||
|
||||
async def _on_client_disconnected(self, websocket):
|
||||
await self._call_event_handler("on_client_disconnected", websocket)
|
||||
@@ -4,11 +4,9 @@
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
|
||||
import asyncio
|
||||
import io
|
||||
import wave
|
||||
import websockets
|
||||
|
||||
from typing import Awaitable, Callable
|
||||
from pydantic.main import BaseModel
|
||||
@@ -23,6 +21,13 @@ from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
|
||||
from loguru import logger
|
||||
|
||||
try:
|
||||
import websockets
|
||||
except ModuleNotFoundError as e:
|
||||
logger.error(f"Exception: {e}")
|
||||
logger.error("In order to use websockets, you need to `pip install pipecat-ai[websocket]`.")
|
||||
raise Exception(f"Missing module: {e}")
|
||||
|
||||
|
||||
class WebsocketServerParams(TransportParams):
|
||||
add_wav_header: bool = False
|
||||
@@ -88,7 +93,7 @@ class WebsocketServerInputTransport(BaseInputTransport):
|
||||
continue
|
||||
|
||||
if isinstance(frame, AudioRawFrame):
|
||||
self.push_audio_frame(frame)
|
||||
await self.push_audio_frame(frame)
|
||||
else:
|
||||
await self._internal_push_frame(frame)
|
||||
|
||||
@@ -118,7 +123,7 @@ class WebsocketServerOutputTransport(BaseOutputTransport):
|
||||
logger.warning("Only one client allowed, using new connection")
|
||||
self._websocket = websocket
|
||||
|
||||
def write_raw_audio_frames(self, frames: bytes):
|
||||
async def write_raw_audio_frames(self, frames: bytes):
|
||||
self._audio_buffer += frames
|
||||
while len(self._audio_buffer) >= self._params.audio_frame_size:
|
||||
frame = AudioRawFrame(
|
||||
@@ -144,9 +149,7 @@ class WebsocketServerOutputTransport(BaseOutputTransport):
|
||||
|
||||
proto = self._params.serializer.serialize(frame)
|
||||
|
||||
future = asyncio.run_coroutine_threadsafe(
|
||||
self._websocket.send(proto), self.get_event_loop())
|
||||
future.result()
|
||||
await self._websocket.send(proto)
|
||||
|
||||
self._audio_buffer = self._audio_buffer[self._params.audio_frame_size:]
|
||||
|
||||
|
||||
@@ -6,11 +6,10 @@
|
||||
|
||||
import aiohttp
|
||||
import asyncio
|
||||
import queue
|
||||
import time
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Callable, Mapping
|
||||
from typing import Any, Awaitable, Callable, Mapping
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
from daily import (
|
||||
@@ -108,19 +107,27 @@ class DailyParams(TransportParams):
|
||||
|
||||
|
||||
class DailyCallbacks(BaseModel):
|
||||
on_joined: Callable[[Mapping[str, Any]], None]
|
||||
on_left: Callable[[], None]
|
||||
on_error: Callable[[str], None]
|
||||
on_app_message: Callable[[Any, str], None]
|
||||
on_call_state_updated: Callable[[str], None]
|
||||
on_dialin_ready: Callable[[str], None]
|
||||
on_dialout_connected: Callable[[Any], None]
|
||||
on_dialout_stopped: Callable[[Any], None]
|
||||
on_dialout_error: Callable[[Any], None]
|
||||
on_dialout_warning: Callable[[Any], None]
|
||||
on_first_participant_joined: Callable[[Mapping[str, Any]], None]
|
||||
on_participant_joined: Callable[[Mapping[str, Any]], None]
|
||||
on_participant_left: Callable[[Mapping[str, Any], str], None]
|
||||
on_joined: Callable[[Mapping[str, Any]], Awaitable[None]]
|
||||
on_left: Callable[[], Awaitable[None]]
|
||||
on_error: Callable[[str], Awaitable[None]]
|
||||
on_app_message: Callable[[Any, str], Awaitable[None]]
|
||||
on_call_state_updated: Callable[[str], Awaitable[None]]
|
||||
on_dialin_ready: Callable[[str], Awaitable[None]]
|
||||
on_dialout_answered: Callable[[Any], Awaitable[None]]
|
||||
on_dialout_connected: Callable[[Any], Awaitable[None]]
|
||||
on_dialout_stopped: Callable[[Any], Awaitable[None]]
|
||||
on_dialout_error: Callable[[Any], Awaitable[None]]
|
||||
on_dialout_warning: Callable[[Any], Awaitable[None]]
|
||||
on_first_participant_joined: Callable[[Mapping[str, Any]], Awaitable[None]]
|
||||
on_participant_joined: Callable[[Mapping[str, Any]], Awaitable[None]]
|
||||
on_participant_left: Callable[[Mapping[str, Any], str], Awaitable[None]]
|
||||
|
||||
|
||||
def completion_callback(future):
|
||||
def _callback(*args):
|
||||
if not future.cancelled():
|
||||
future.get_loop().call_soon_threadsafe(future.set_result, *args)
|
||||
return _callback
|
||||
|
||||
|
||||
class DailyTransportClient(EventHandler):
|
||||
@@ -160,7 +167,6 @@ class DailyTransportClient(EventHandler):
|
||||
self._joined = False
|
||||
self._joining = False
|
||||
self._leaving = False
|
||||
self._sync_response = {k: queue.Queue() for k in ["join", "leave"]}
|
||||
|
||||
self._executor = ThreadPoolExecutor(max_workers=5)
|
||||
|
||||
@@ -173,10 +179,16 @@ class DailyTransportClient(EventHandler):
|
||||
color_format=self._params.camera_out_color_format)
|
||||
|
||||
self._mic: VirtualMicrophoneDevice = Daily.create_microphone_device(
|
||||
"mic", sample_rate=self._params.audio_out_sample_rate, channels=self._params.audio_out_channels)
|
||||
"mic",
|
||||
sample_rate=self._params.audio_out_sample_rate,
|
||||
channels=self._params.audio_out_channels,
|
||||
non_blocking=True)
|
||||
|
||||
self._speaker: VirtualSpeakerDevice = Daily.create_speaker_device(
|
||||
"speaker", sample_rate=self._params.audio_in_sample_rate, channels=self._params.audio_in_channels)
|
||||
"speaker",
|
||||
sample_rate=self._params.audio_in_sample_rate,
|
||||
channels=self._params.audio_in_channels,
|
||||
non_blocking=True)
|
||||
Daily.select_speaker_device("speaker")
|
||||
|
||||
@property
|
||||
@@ -186,30 +198,39 @@ class DailyTransportClient(EventHandler):
|
||||
def set_callbacks(self, callbacks: DailyCallbacks):
|
||||
self._callbacks = callbacks
|
||||
|
||||
def send_message(self, frame: DailyTransportMessageFrame):
|
||||
self._client.send_app_message(frame.message, frame.participant_id)
|
||||
async def send_message(self, frame: DailyTransportMessageFrame):
|
||||
future = self._loop.create_future()
|
||||
self._client.send_app_message(
|
||||
frame.message,
|
||||
frame.participant_id,
|
||||
completion=completion_callback(future))
|
||||
await future
|
||||
|
||||
def read_next_audio_frame(self) -> AudioRawFrame | None:
|
||||
async def read_next_audio_frame(self) -> AudioRawFrame | None:
|
||||
sample_rate = self._params.audio_in_sample_rate
|
||||
num_channels = self._params.audio_in_channels
|
||||
|
||||
if self._other_participant_has_joined:
|
||||
num_frames = int(sample_rate / 100) * 2 # 20ms of audio
|
||||
|
||||
audio = self._speaker.read_frames(num_frames)
|
||||
future = self._loop.create_future()
|
||||
self._speaker.read_frames(num_frames, completion=completion_callback(future))
|
||||
audio = await future
|
||||
|
||||
return AudioRawFrame(audio=audio, sample_rate=sample_rate, num_channels=num_channels)
|
||||
else:
|
||||
# If no one has ever joined the meeting `read_frames()` would block,
|
||||
# instead we just wait a bit. daily-python should probably return
|
||||
# silence instead.
|
||||
time.sleep(0.01)
|
||||
await asyncio.sleep(0.01)
|
||||
return None
|
||||
|
||||
def write_raw_audio_frames(self, frames: bytes):
|
||||
self._mic.write_frames(frames)
|
||||
async def write_raw_audio_frames(self, frames: bytes):
|
||||
future = self._loop.create_future()
|
||||
self._mic.write_frames(frames, completion=completion_callback(future))
|
||||
await future
|
||||
|
||||
def write_frame_to_camera(self, frame: ImageRawFrame):
|
||||
async def write_frame_to_camera(self, frame: ImageRawFrame):
|
||||
self._camera.write_frame(frame.image)
|
||||
|
||||
async def join(self):
|
||||
@@ -217,13 +238,10 @@ class DailyTransportClient(EventHandler):
|
||||
if self._joined or self._joining:
|
||||
return
|
||||
|
||||
self._joining = True
|
||||
|
||||
await self._loop.run_in_executor(self._executor, self._join)
|
||||
|
||||
def _join(self):
|
||||
logger.info(f"Joining {self._room_url}")
|
||||
|
||||
self._joining = True
|
||||
|
||||
# For performance reasons, never subscribe to video streams (unless a
|
||||
# video renderer is registered).
|
||||
self._client.update_subscription_profiles({
|
||||
@@ -235,10 +253,42 @@ class DailyTransportClient(EventHandler):
|
||||
|
||||
self._client.set_user_name(self._bot_name)
|
||||
|
||||
try:
|
||||
(data, error) = await self._join()
|
||||
|
||||
if not error:
|
||||
self._joined = True
|
||||
self._joining = False
|
||||
|
||||
logger.info(f"Joined {self._room_url}")
|
||||
|
||||
if self._token and self._params.transcription_enabled:
|
||||
logger.info(
|
||||
f"Enabling transcription with settings {self._params.transcription_settings}")
|
||||
self._client.start_transcription(
|
||||
self._params.transcription_settings.model_dump())
|
||||
|
||||
await self._callbacks.on_joined(data["participants"]["local"])
|
||||
else:
|
||||
error_msg = f"Error joining {self._room_url}: {error}"
|
||||
logger.error(error_msg)
|
||||
await self._callbacks.on_error(error_msg)
|
||||
except asyncio.TimeoutError:
|
||||
error_msg = f"Time out joining {self._room_url}"
|
||||
logger.error(error_msg)
|
||||
await self._callbacks.on_error(error_msg)
|
||||
|
||||
async def _join(self):
|
||||
future = self._loop.create_future()
|
||||
|
||||
def handle_join_response(data, error):
|
||||
if not future.cancelled():
|
||||
future.get_loop().call_soon_threadsafe(future.set_result, (data, error))
|
||||
|
||||
self._client.join(
|
||||
self._room_url,
|
||||
self._token,
|
||||
completion=self._call_joined,
|
||||
completion=handle_join_response,
|
||||
client_settings={
|
||||
"inputs": {
|
||||
"camera": {
|
||||
@@ -274,33 +324,7 @@ class DailyTransportClient(EventHandler):
|
||||
},
|
||||
})
|
||||
|
||||
self._handle_join_response()
|
||||
|
||||
def _handle_join_response(self):
|
||||
try:
|
||||
(data, error) = self._sync_response["join"].get(timeout=10)
|
||||
if not error:
|
||||
self._joined = True
|
||||
self._joining = False
|
||||
|
||||
logger.info(f"Joined {self._room_url}")
|
||||
|
||||
if self._token and self._params.transcription_enabled:
|
||||
logger.info(
|
||||
f"Enabling transcription with settings {self._params.transcription_settings}")
|
||||
self._client.start_transcription(
|
||||
self._params.transcription_settings.model_dump())
|
||||
|
||||
self._callbacks.on_joined(data["participants"]["local"])
|
||||
else:
|
||||
error_msg = f"Error joining {self._room_url}: {error}"
|
||||
logger.error(error_msg)
|
||||
self._callbacks.on_error(error_msg)
|
||||
self._sync_response["join"].task_done()
|
||||
except queue.Empty:
|
||||
error_msg = f"Time out joining {self._room_url}"
|
||||
logger.error(error_msg)
|
||||
self._callbacks.on_error(error_msg)
|
||||
return await asyncio.wait_for(future, timeout=10)
|
||||
|
||||
async def leave(self):
|
||||
# Transport not joined, ignore.
|
||||
@@ -310,34 +334,36 @@ class DailyTransportClient(EventHandler):
|
||||
self._joined = False
|
||||
self._leaving = True
|
||||
|
||||
await self._loop.run_in_executor(self._executor, self._leave)
|
||||
|
||||
def _leave(self):
|
||||
logger.info(f"Leaving {self._room_url}")
|
||||
|
||||
if self._params.transcription_enabled:
|
||||
self._client.stop_transcription()
|
||||
|
||||
self._client.leave(completion=self._call_left)
|
||||
|
||||
self._handle_leave_response()
|
||||
|
||||
def _handle_leave_response(self):
|
||||
try:
|
||||
error = self._sync_response["leave"].get(timeout=10)
|
||||
error = await self._leave()
|
||||
if not error:
|
||||
self._leaving = False
|
||||
logger.info(f"Left {self._room_url}")
|
||||
self._callbacks.on_left()
|
||||
await self._callbacks.on_left()
|
||||
else:
|
||||
error_msg = f"Error leaving {self._room_url}: {error}"
|
||||
logger.error(error_msg)
|
||||
self._callbacks.on_error(error_msg)
|
||||
self._sync_response["leave"].task_done()
|
||||
except queue.Empty:
|
||||
await self._callbacks.on_error(error_msg)
|
||||
except asyncio.TimeoutError:
|
||||
error_msg = f"Time out leaving {self._room_url}"
|
||||
logger.error(error_msg)
|
||||
self._callbacks.on_error(error_msg)
|
||||
await self._callbacks.on_error(error_msg)
|
||||
|
||||
async def _leave(self):
|
||||
future = self._loop.create_future()
|
||||
|
||||
def handle_leave_response(error):
|
||||
if not future.cancelled():
|
||||
future.get_loop().call_soon_threadsafe(future.set_result, error)
|
||||
|
||||
self._client.leave(completion=handle_leave_response)
|
||||
|
||||
return await asyncio.wait_for(future, timeout=10)
|
||||
|
||||
async def cleanup(self):
|
||||
await self._loop.run_in_executor(self._executor, self._cleanup)
|
||||
@@ -399,25 +425,28 @@ class DailyTransportClient(EventHandler):
|
||||
#
|
||||
|
||||
def on_app_message(self, message: Any, sender: str):
|
||||
self._callbacks.on_app_message(message, sender)
|
||||
self._call_async_callback(self._callbacks.on_app_message, message, sender)
|
||||
|
||||
def on_call_state_updated(self, state: str):
|
||||
self._callbacks.on_call_state_updated(state)
|
||||
self._call_async_callback(self._callbacks.on_call_state_updated, state)
|
||||
|
||||
def on_dialin_ready(self, sip_endpoint: str):
|
||||
self._callbacks.on_dialin_ready(sip_endpoint)
|
||||
self._call_async_callback(self._callbacks.on_dialin_ready, sip_endpoint)
|
||||
|
||||
def on_dialout_answered(self, data: Any):
|
||||
self._call_async_callback(self._callbacks.on_dialout_answered, data)
|
||||
|
||||
def on_dialout_connected(self, data: Any):
|
||||
self._callbacks.on_dialout_connected(data)
|
||||
self._call_async_callback(self._callbacks.on_dialout_connected, data)
|
||||
|
||||
def on_dialout_stopped(self, data: Any):
|
||||
self._callbacks.on_dialout_stopped(data)
|
||||
self._call_async_callback(self._callbacks.on_dialout_stopped, data)
|
||||
|
||||
def on_dialout_error(self, data: Any):
|
||||
self._callbacks.on_dialout_error(data)
|
||||
self._call_async_callback(self._callbacks.on_dialout_error, data)
|
||||
|
||||
def on_dialout_warning(self, data: Any):
|
||||
self._callbacks.on_dialout_warning(data)
|
||||
self._call_async_callback(self._callbacks.on_dialout_warning, data)
|
||||
|
||||
def on_participant_joined(self, participant):
|
||||
id = participant["id"]
|
||||
@@ -425,15 +454,15 @@ class DailyTransportClient(EventHandler):
|
||||
|
||||
if not self._other_participant_has_joined:
|
||||
self._other_participant_has_joined = True
|
||||
self._callbacks.on_first_participant_joined(participant)
|
||||
self._call_async_callback(self._callbacks.on_first_participant_joined, participant)
|
||||
|
||||
self._callbacks.on_participant_joined(participant)
|
||||
self._call_async_callback(self._callbacks.on_participant_joined, participant)
|
||||
|
||||
def on_participant_left(self, participant, reason):
|
||||
id = participant["id"]
|
||||
logger.info(f"Participant left {id}")
|
||||
|
||||
self._callbacks.on_participant_left(participant, reason)
|
||||
self._call_async_callback(self._callbacks.on_participant_left, participant, reason)
|
||||
|
||||
def on_transcription_message(self, message: Mapping[str, Any]):
|
||||
participant_id = ""
|
||||
@@ -442,7 +471,7 @@ class DailyTransportClient(EventHandler):
|
||||
|
||||
if participant_id in self._transcription_renderers:
|
||||
callback = self._transcription_renderers[participant_id]
|
||||
callback(participant_id, message)
|
||||
self._call_async_callback(callback, participant_id, message)
|
||||
|
||||
def on_transcription_error(self, message):
|
||||
logger.error(f"Transcription error: {message}")
|
||||
@@ -457,18 +486,19 @@ class DailyTransportClient(EventHandler):
|
||||
# Daily (CallClient callbacks)
|
||||
#
|
||||
|
||||
def _call_joined(self, data, error):
|
||||
self._sync_response["join"].put((data, error))
|
||||
|
||||
def _call_left(self, error):
|
||||
self._sync_response["leave"].put(error)
|
||||
|
||||
def _video_frame_received(self, participant_id, video_frame):
|
||||
callback = self._video_renderers[participant_id]
|
||||
callback(participant_id,
|
||||
video_frame.buffer,
|
||||
(video_frame.width, video_frame.height),
|
||||
video_frame.color_format)
|
||||
self._call_async_callback(
|
||||
callback,
|
||||
participant_id,
|
||||
video_frame.buffer,
|
||||
(video_frame.width,
|
||||
video_frame.height),
|
||||
video_frame.color_format)
|
||||
|
||||
def _call_async_callback(self, callback, *args):
|
||||
future = asyncio.run_coroutine_threadsafe(callback(*args), self._loop)
|
||||
future.result()
|
||||
|
||||
|
||||
class DailyInputTransport(BaseInputTransport):
|
||||
@@ -487,8 +517,6 @@ class DailyInputTransport(BaseInputTransport):
|
||||
num_channels=self._params.audio_in_channels)
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
if self._running:
|
||||
return
|
||||
# Parent start.
|
||||
await super().start(frame)
|
||||
# Join the room.
|
||||
@@ -496,19 +524,17 @@ class DailyInputTransport(BaseInputTransport):
|
||||
# Create audio task. It reads audio frames from Daily and push them
|
||||
# internally for VAD processing.
|
||||
if self._params.audio_in_enabled or self._params.vad_enabled:
|
||||
self._audio_in_thread = self._loop.run_in_executor(
|
||||
self._executor, self._audio_in_thread_handler)
|
||||
self._audio_in_task = self.get_event_loop().create_task(self._audio_in_task_handler())
|
||||
|
||||
async def stop(self):
|
||||
if not self._running:
|
||||
return
|
||||
# Parent stop. This will set _running to False.
|
||||
# Parent stop.
|
||||
await super().stop()
|
||||
# Leave the room.
|
||||
await self._client.leave()
|
||||
# Stop audio thread.
|
||||
if self._params.audio_in_enabled or self._params.vad_enabled:
|
||||
await self._audio_in_thread
|
||||
self._audio_in_task.cancel()
|
||||
await self._audio_in_task
|
||||
|
||||
async def cleanup(self):
|
||||
await super().cleanup()
|
||||
@@ -531,26 +557,25 @@ class DailyInputTransport(BaseInputTransport):
|
||||
# Frames
|
||||
#
|
||||
|
||||
def push_transcription_frame(self, frame: TranscriptionFrame | InterimTranscriptionFrame):
|
||||
future = asyncio.run_coroutine_threadsafe(
|
||||
self._internal_push_frame(frame), self.get_event_loop())
|
||||
future.result()
|
||||
async def push_transcription_frame(self, frame: TranscriptionFrame | InterimTranscriptionFrame):
|
||||
await self._internal_push_frame(frame)
|
||||
|
||||
def push_app_message(self, message: Any, sender: str):
|
||||
async def push_app_message(self, message: Any, sender: str):
|
||||
frame = DailyTransportMessageFrame(message=message, participant_id=sender)
|
||||
future = asyncio.run_coroutine_threadsafe(
|
||||
self._internal_push_frame(frame), self.get_event_loop())
|
||||
future.result()
|
||||
await self._internal_push_frame(frame)
|
||||
|
||||
#
|
||||
# Audio in
|
||||
#
|
||||
|
||||
def _audio_in_thread_handler(self):
|
||||
while self._running:
|
||||
frame = self._client.read_next_audio_frame()
|
||||
if frame:
|
||||
self.push_audio_frame(frame)
|
||||
async def _audio_in_task_handler(self):
|
||||
while True:
|
||||
try:
|
||||
frame = await self._client.read_next_audio_frame()
|
||||
if frame:
|
||||
await self.push_audio_frame(frame)
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
|
||||
#
|
||||
# Camera in
|
||||
@@ -580,7 +605,7 @@ class DailyInputTransport(BaseInputTransport):
|
||||
if participant_id in self._video_renderers:
|
||||
self._video_renderers[participant_id]["render_next_frame"] = True
|
||||
|
||||
def _on_participant_video_frame(self, participant_id: str, buffer, size, format):
|
||||
async def _on_participant_video_frame(self, participant_id: str, buffer, size, format):
|
||||
render_frame = False
|
||||
|
||||
curr_time = time.time()
|
||||
@@ -600,9 +625,7 @@ class DailyInputTransport(BaseInputTransport):
|
||||
image=buffer,
|
||||
size=size,
|
||||
format=format)
|
||||
future = asyncio.run_coroutine_threadsafe(
|
||||
self._internal_push_frame(frame), self.get_event_loop())
|
||||
future.result()
|
||||
await self._internal_push_frame(frame)
|
||||
|
||||
self._video_renderers[participant_id]["timestamp"] = curr_time
|
||||
|
||||
@@ -615,17 +638,13 @@ class DailyOutputTransport(BaseOutputTransport):
|
||||
self._client = client
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
if self._running:
|
||||
return
|
||||
# Parent start.
|
||||
await super().start(frame)
|
||||
# Join the room.
|
||||
await self._client.join()
|
||||
|
||||
async def stop(self):
|
||||
if not self._running:
|
||||
return
|
||||
# Parent stop. This will set _running to False.
|
||||
# Parent stop.
|
||||
await super().stop()
|
||||
# Leave the room.
|
||||
await self._client.leave()
|
||||
@@ -634,10 +653,10 @@ class DailyOutputTransport(BaseOutputTransport):
|
||||
await super().cleanup()
|
||||
await self._client.cleanup()
|
||||
|
||||
def send_message(self, frame: DailyTransportMessageFrame):
|
||||
self._client.send_message(frame)
|
||||
async def send_message(self, frame: DailyTransportMessageFrame):
|
||||
await self._client.send_message(frame)
|
||||
|
||||
def send_metrics(self, frame: MetricsFrame):
|
||||
async def send_metrics(self, frame: MetricsFrame):
|
||||
ttfb = [{"name": n, "time": t} for n, t in frame.ttfb.items()]
|
||||
message = DailyTransportMessageFrame(message={
|
||||
"type": "pipecat-metrics",
|
||||
@@ -645,13 +664,13 @@ class DailyOutputTransport(BaseOutputTransport):
|
||||
"ttfb": ttfb
|
||||
},
|
||||
})
|
||||
self._client.send_message(message)
|
||||
await self._client.send_message(message)
|
||||
|
||||
def write_raw_audio_frames(self, frames: bytes):
|
||||
self._client.write_raw_audio_frames(frames)
|
||||
async def write_raw_audio_frames(self, frames: bytes):
|
||||
await self._client.write_raw_audio_frames(frames)
|
||||
|
||||
def write_frame_to_camera(self, frame: ImageRawFrame):
|
||||
self._client.write_frame_to_camera(frame)
|
||||
async def write_frame_to_camera(self, frame: ImageRawFrame):
|
||||
await self._client.write_frame_to_camera(frame)
|
||||
|
||||
|
||||
class DailyTransport(BaseTransport):
|
||||
@@ -674,6 +693,7 @@ class DailyTransport(BaseTransport):
|
||||
on_app_message=self._on_app_message,
|
||||
on_call_state_updated=self._on_call_state_updated,
|
||||
on_dialin_ready=self._on_dialin_ready,
|
||||
on_dialout_answered=self._on_dialout_answered,
|
||||
on_dialout_connected=self._on_dialout_connected,
|
||||
on_dialout_stopped=self._on_dialout_stopped,
|
||||
on_dialout_error=self._on_dialout_error,
|
||||
@@ -696,6 +716,7 @@ class DailyTransport(BaseTransport):
|
||||
self._register_event_handler("on_app_message")
|
||||
self._register_event_handler("on_call_state_updated")
|
||||
self._register_event_handler("on_dialin_ready")
|
||||
self._register_event_handler("on_dialout_answered")
|
||||
self._register_event_handler("on_dialout_connected")
|
||||
self._register_event_handler("on_dialout_stopped")
|
||||
self._register_event_handler("on_dialout_error")
|
||||
@@ -768,24 +789,24 @@ class DailyTransport(BaseTransport):
|
||||
self._input.capture_participant_video(
|
||||
participant_id, framerate, video_source, color_format)
|
||||
|
||||
def _on_joined(self, participant):
|
||||
self._call_async_event_handler("on_joined", participant)
|
||||
async def _on_joined(self, participant):
|
||||
await self._call_event_handler("on_joined", participant)
|
||||
|
||||
def _on_left(self):
|
||||
self._call_async_event_handler("on_left")
|
||||
async def _on_left(self):
|
||||
await self._call_event_handler("on_left")
|
||||
|
||||
def _on_error(self, error):
|
||||
async def _on_error(self, error):
|
||||
# TODO(aleix): Report error to input/output transports. The one managing
|
||||
# the client should report the error.
|
||||
pass
|
||||
|
||||
def _on_app_message(self, message: Any, sender: str):
|
||||
async def _on_app_message(self, message: Any, sender: str):
|
||||
if self._input:
|
||||
self._input.push_app_message(message, sender)
|
||||
self._call_async_event_handler("on_app_message", message, sender)
|
||||
await self._input.push_app_message(message, sender)
|
||||
await self._call_event_handler("on_app_message", message, sender)
|
||||
|
||||
def _on_call_state_updated(self, state: str):
|
||||
self._call_async_event_handler("on_call_state_updated", state)
|
||||
async def _on_call_state_updated(self, state: str):
|
||||
await self._call_event_handler("on_call_state_updated", state)
|
||||
|
||||
async def _handle_dialin_ready(self, sip_endpoint: str):
|
||||
if not self._params.dialin_settings:
|
||||
@@ -818,33 +839,36 @@ class DailyTransport(BaseTransport):
|
||||
except BaseException as e:
|
||||
logger.error(f"Error handling dialin-ready event ({url}): {e}")
|
||||
|
||||
def _on_dialin_ready(self, sip_endpoint):
|
||||
async def _on_dialin_ready(self, sip_endpoint):
|
||||
if self._params.dialin_settings:
|
||||
asyncio.run_coroutine_threadsafe(self._handle_dialin_ready(sip_endpoint), self._loop)
|
||||
self._call_async_event_handler("on_dialin_ready", sip_endpoint)
|
||||
await self._handle_dialin_ready(sip_endpoint)
|
||||
await self._call_event_handler("on_dialin_ready", sip_endpoint)
|
||||
|
||||
def _on_dialout_connected(self, data):
|
||||
self._call_async_event_handler("on_dialout_connected", data)
|
||||
async def _on_dialout_answered(self, data):
|
||||
await self._call_event_handler("on_dialout_answered", data)
|
||||
|
||||
def _on_dialout_stopped(self, data):
|
||||
self._call_async_event_handler("on_dialout_stopped", data)
|
||||
async def _on_dialout_connected(self, data):
|
||||
await self._call_event_handler("on_dialout_connected", data)
|
||||
|
||||
def _on_dialout_error(self, data):
|
||||
self._call_async_event_handler("on_dialout_error", data)
|
||||
async def _on_dialout_stopped(self, data):
|
||||
await self._call_event_handler("on_dialout_stopped", data)
|
||||
|
||||
def _on_dialout_warning(self, data):
|
||||
self._call_async_event_handler("on_dialout_warning", data)
|
||||
async def _on_dialout_error(self, data):
|
||||
await self._call_event_handler("on_dialout_error", data)
|
||||
|
||||
def _on_participant_joined(self, participant):
|
||||
self._call_async_event_handler("on_participant_joined", participant)
|
||||
async def _on_dialout_warning(self, data):
|
||||
await self._call_event_handler("on_dialout_warning", data)
|
||||
|
||||
def _on_participant_left(self, participant, reason):
|
||||
self._call_async_event_handler("on_participant_left", participant, reason)
|
||||
async def _on_participant_joined(self, participant):
|
||||
await self._call_event_handler("on_participant_joined", participant)
|
||||
|
||||
def _on_first_participant_joined(self, participant):
|
||||
self._call_async_event_handler("on_first_participant_joined", participant)
|
||||
async def _on_participant_left(self, participant, reason):
|
||||
await self._call_event_handler("on_participant_left", participant, reason)
|
||||
|
||||
def _on_transcription_message(self, participant_id, message):
|
||||
async def _on_first_participant_joined(self, participant):
|
||||
await self._call_event_handler("on_first_participant_joined", participant)
|
||||
|
||||
async def _on_transcription_message(self, participant_id, message):
|
||||
text = message["text"]
|
||||
timestamp = message["timestamp"]
|
||||
is_final = message["rawResponse"]["is_final"]
|
||||
@@ -855,9 +879,4 @@ class DailyTransport(BaseTransport):
|
||||
frame = InterimTranscriptionFrame(text, participant_id, timestamp)
|
||||
|
||||
if self._input:
|
||||
self._input.push_transcription_frame(frame)
|
||||
|
||||
def _call_async_event_handler(self, event_name: str, *args, **kwargs):
|
||||
future = asyncio.run_coroutine_threadsafe(
|
||||
self._call_event_handler(event_name, *args, **kwargs), self._loop)
|
||||
future.result()
|
||||
await self._input.push_transcription_frame(frame)
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import audioop
|
||||
import numpy as np
|
||||
import pyloudnorm as pyln
|
||||
|
||||
@@ -31,3 +32,23 @@ def calculate_audio_volume(audio: bytes, sample_rate: int) -> float:
|
||||
|
||||
def exp_smoothing(value: float, prev_value: float, factor: float) -> float:
|
||||
return prev_value + factor * (value - prev_value)
|
||||
|
||||
|
||||
def ulaw_8000_to_pcm_16000(ulaw_8000_bytes):
|
||||
# Convert μ-law to PCM
|
||||
pcm_8000_bytes = audioop.ulaw2lin(ulaw_8000_bytes, 2)
|
||||
|
||||
# Resample from 8000 Hz to 16000 Hz
|
||||
pcm_16000_bytes = audioop.ratecv(pcm_8000_bytes, 2, 1, 8000, 16000, None)[0]
|
||||
|
||||
return pcm_16000_bytes
|
||||
|
||||
|
||||
def pcm_16000_to_ulaw_8000(pcm_16000_bytes):
|
||||
# Resample from 16000 Hz to 8000 Hz
|
||||
pcm_8000_bytes = audioop.ratecv(pcm_16000_bytes, 2, 1, 16000, 8000, None)[0]
|
||||
|
||||
# Convert PCM to μ-law
|
||||
ulaw_8000_bytes = audioop.lin2ulaw(pcm_8000_bytes, 2)
|
||||
|
||||
return ulaw_8000_bytes
|
||||
|
||||
Reference in New Issue
Block a user