Compare commits

...

34 Commits

Author SHA1 Message Date
Jon Taylor
5bd5d22270 removed space from event handler 2024-06-26 18:30:56 +01:00
Jon Taylor
6ee7932337 added pause to start and new intro prompt 2024-06-26 18:24:14 +01:00
Jon Taylor
c407445dd1 removed header comment from bot runner 2024-06-24 17:35:26 +01:00
Jon Taylor
447f37167e added VAD stop seconds env 2024-06-24 17:34:25 +01:00
Jon Taylor
354c21500e prompt tweaks 2024-06-24 17:28:10 +01:00
Jon Taylor
5728e25b5a added fastbot example 2024-06-24 16:25:36 +01:00
Kwindla Hultman Kramer
0b6a19802f Merge pull request #250 from pipecat-ai/lewis/flush-tts-on-llm-response-end
Flush output from TTSService on LLMFullResponseEndFrame
2024-06-22 20:37:45 -04:00
Lewis Wolfgang
c4a2d2197c Flush output from TTSService on LLMFullResponseEndFrame
To cover cases when the LLM response does not end in punctuation.
2024-06-22 14:57:44 -04:00
Aleix Conchillo Flaqué
269d06aa15 Merge pull request #249 from pipecat-ai/aleix/pipecat-0.0.32
update CHANGELOG.md for 0.0.32
2024-06-22 09:21:21 -07:00
Aleix Conchillo Flaqué
dfef1f2c54 update CHANGELOG.md for 0.0.32 2024-06-22 09:19:22 -07:00
Aleix Conchillo Flaqué
b62beaba0b Merge pull request #248 from pipecat-ai/aleix/deepgramstt-url
services(deepgram): add url to DeepgramSTTService
2024-06-21 22:26:23 -07:00
Aleix Conchillo Flaqué
adf414e40f services(deepgram): add url to DeepgramSTTService 2024-06-21 16:52:28 -07:00
Aleix Conchillo Flaqué
dc64e57f63 Merge pull request #241 from pipecat-ai/aleix/transports-async
transports: fully use asyncio in all read/write operations
2024-06-21 16:00:08 -07:00
Aleix Conchillo Flaqué
d3e410b2ac transports: fully use asyncio in all read/write operations 2024-06-21 15:55:15 -07:00
Aleix Conchillo Flaqué
c544b2474b update linux-py3.10-requirements with fastapi and new daily-python 2024-06-21 15:44:01 -07:00
Aleix Conchillo Flaqué
18243de358 add fastapi and update macos-py3.10-requirements.txt 2024-06-21 13:16:47 -07:00
Aleix Conchillo Flaqué
6625895d1f update macos-py3.10-requirements.txt 2024-06-21 13:13:02 -07:00
Aleix Conchillo Flaqué
f9ecce739e Merge pull request #247 from pipecat-ai/aleix/twilio-updates
some twilio updates
2024-06-21 10:14:40 -07:00
Aleix Conchillo Flaqué
0075dd8386 update linux/macos-py3.10-requirements.txt 2024-06-21 09:48:12 -07:00
Aleix Conchillo Flaqué
eef1cde816 updated CHANGELOG.md with fastapi and twilio updates 2024-06-21 09:48:12 -07:00
Aleix Conchillo Flaqué
8d867c30c6 transports(websocket): verify websockets module 2024-06-21 09:48:12 -07:00
Aleix Conchillo Flaqué
42c668b7ae examples(twilio-chatbot): update instructions and renames 2024-06-21 09:48:12 -07:00
Aleix Conchillo Flaqué
b62227b4ae serializers(twilio): formatting and allow str | bytes | None 2024-06-21 09:47:17 -07:00
Aleix Conchillo Flaqué
25ef0cb87b serializers: allow str | bytes | None 2024-06-21 09:42:43 -07:00
Aleix Conchillo Flaqué
e195941aa5 Merge pull request #246 from pipecat-ai/aleix/daily-dialout-answered
transports(daily): added dialout_answered event
2024-06-20 18:37:24 -07:00
Aleix Conchillo Flaqué
e09eef1dd7 Merge pull request #243 from Viking5274/main
Add twilio_websocket_service with example
2024-06-20 14:09:48 -07:00
Aleix Conchillo Flaqué
7c13663a4e transports(daily): added dialout_answered event 2024-06-20 13:01:25 -07:00
daniil5701133
5753869e5e add twilio-chatbot example with README.md info how to start app
created twilio_websocket_service.py, TwilioFrameSerializer.py

moved pcm_16000_to_ulaw_8000 and ulaw_8000_to_pcm_16000 to src/pipecat/utils/audio.py
fixed callback on disconnect
2024-06-20 23:00:01 +03:00
chadbailey59
ba878a19f4 fixed "Dr." interruption (#245) 2024-06-19 20:53:04 -05:00
Aleix Conchillo Flaqué
55a9de78cd Merge pull request #239 from pipecat-ai/aleix/azure-stt
azure stt support
2024-06-14 14:07:07 +08:00
Aleix Conchillo Flaqué
ff51fc9091 updated CHANGELOG and README 2024-06-13 17:03:49 -07:00
Aleix Conchillo Flaqué
a4f857ee34 examples: use new AzureSTTService in 07f-interruptible-azure 2024-06-13 17:03:49 -07:00
Aleix Conchillo Flaqué
3250d74bef services(azure): new AzureSTTService 2024-06-13 17:03:49 -07:00
Aleix Conchillo Flaqué
c086160239 examples: cleanup some 07 interruptible examples 2024-06-13 16:36:10 -07:00
42 changed files with 2288 additions and 666 deletions

View File

@@ -5,6 +5,37 @@ All notable changes to **pipecat** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [0.0.32] - 2024-06-22
### Added
- Allow specifying a `DeepgramSTTService` url which allows using on-prem
Deepgram.
- Added new `FastAPIWebsocketTransport`. This is a new websocket transport that
can be integrated with FastAPI websockets.
- Added new `TwilioFrameSerializer`. This is a new serializer that knows how to
serialize and deserialize audio frames from Twilio.
- Added Daily transport event: `on_dialout_answered`. See
https://reference-python.daily.co/api_reference.html#daily.EventHandler
- Added new `AzureSTTService`. This allows you to use Azure Speech-To-Text.
### Performance
- Convert `BaseOutputTransport` and `BaseOutputTransport` to fully use asyncio
and remove the use of threads.
### Other
- Added `twilio-chatbot`. This is an example that shows how to integrate Twilio
phone numbers with a Pipecat bot.
- Updated `07f-interruptible-azure.py` to use `AzureLLMService`,
`AzureSTTService` and `AzureTTSService`.
## [0.0.31] - 2024-06-13
### Performance

View File

@@ -39,7 +39,7 @@ pip install "pipecat-ai[option,...]"
Your project may or may not need these, so they're made available as optional requirements. Here is a list:
- **AI services**: `anthropic`, `azure`, `deepgram`, `google`, `fal`, `moondream`, `openai`, `playht`, `silero`, `whisper`
- **AI services**: `anthropic`, `azure`, `deepgram`, `google`, `fal`, `moondream`, `openai`, `openpipe`, `playht`, `silero`, `whisper`
- **Transports**: `local`, `websocket`, `daily`
## Code examples

View File

@@ -2,6 +2,7 @@ autopep8~=2.1.0
build~=1.2.1
grpcio-tools~=1.62.2
pip-tools~=7.4.1
pyright~=1.1.367
pytest~=8.2.0
setuptools~=69.5.1
setuptools_scm~=8.1.0

View File

@@ -32,14 +32,15 @@ Next, follow the steps in the README for each demo.
## Projects:
| Project | Description | Services |
| -------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------ | ---------------------------------------------- |
| [Simple Chatbot](simple-chatbot) | Basic voice-driven conversational bot. A good starting point for learning the flow of the framework. | Deepgram, OpenAI, Daily, Daily Prebuilt UI |
| [Storytelling Chatbot](storytelling-chatbot) | Stitches together multiple third-party services to create a collaborative storytime experience. | Deepgram, ElevenLabs, Open AI, Fal, Daily, Custom UI |
| [Translation Chatbot](translation-chatbot) | Listens for user speech, then translates that speech to Spanish and speaks the translation back. Demonstrates multi-participant use-cases. | Deepgram, Azure, OpenAI, Daily, Daily Prebuilt UI |
| [Moondream Chatbot](moondream-chatbot) | Demonstrates how to add vision capabilities to GPT4. **Note: works best with a GPU** | Deepgram, OpenAI, Moondream, Daily, Daily Prebuilt UI |
| Function-calling Chatbot (TBC) | A chatbot that can call functions in response to user input. | Deepgram, OpenAI, Fireworks, Daily, Daily Prebuilt UI |
| [Dialin Chatbot](dialin-chatbot) | A chatbot that connects to an incoming phone call from Daily or Twilio. | Deepgram, OpenAI, ElevenLabs, Daily, Twilio |
| Project | Description | Services |
|----------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------|
| [Simple Chatbot](simple-chatbot) | Basic voice-driven conversational bot. A good starting point for learning the flow of the framework. | Deepgram, ElevenLabs, OpenAI, Daily, Daily Prebuilt UI |
| [Storytelling Chatbot](storytelling-chatbot) | Stitches together multiple third-party services to create a collaborative storytime experience. | Deepgram, ElevenLabs, OpenAI, Fal, Daily, Custom UI |
| [Translation Chatbot](translation-chatbot) | Listens for user speech, then translates that speech to Spanish and speaks the translation back. Demonstrates multi-participant use-cases. | Deepgram, Azure, OpenAI, Daily, Daily Prebuilt UI |
| [Moondream Chatbot](moondream-chatbot) | Demonstrates how to add vision capabilities to GPT4. **Note: works best with a GPU** | Deepgram, ElevenLabs, OpenAI, Moondream, Daily, Daily Prebuilt UI |
| [Patient intake](patient-intake) | A chatbot that can call functions in response to user input. | Deepgram, ElevenLabs, OpenAI, Daily, Daily Prebuilt UI |
| [Dialin Chatbot](dialin-chatbot) | A chatbot that connects to an incoming phone call from Daily or Twilio. | Deepgram, ElevenLabs, OpenAI, Daily, Twilio |
| [Twilio Chatbot](twilio-chatbot) | A chatbot that connects to an incoming phone call from Twilio. | Deepgram, ElevenLabs, OpenAI, Daily, Twilio |
> [!IMPORTANT]
> These example projects use Daily as a WebRTC transport and can be joined using their hosted Prebuilt UI.

165
examples/fast-chatbot/.gitignore vendored Normal file
View File

@@ -0,0 +1,165 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
runpod.toml
# custom script to recursively upgrade items in requirements.py
upgrade_requirements.py
.DS_Store

View File

View File

@@ -0,0 +1,164 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
from loguru import logger
import argparse
import asyncio
import aiohttp
import os
import sys
import time
from typing import Optional
from pydantic import BaseModel, ValidationError
from pipecat.vad.vad_analyzer import VADParams
from pipecat.vad.silero import SileroVADAnalyzer
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.services.openai import OpenAILLMService
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.pipeline import Pipeline
from pipecat.frames.frames import LLMMessagesFrame, EndFrame
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator
)
from helpers import (
ClearableDeepgramTTSService,
AudioVolumeTimer,
TranscriptionTimingLogger
)
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level=os.getenv("LOG_LEVEL", "DEBUG"))
class BotSettings(BaseModel):
room_url: str
room_token: str
bot_name: str = "Pipecat"
prompt: Optional[str] = "You are a helpful assistant."
deepgram_api_key: Optional[str] = os.getenv("DEEPGRAM_API_KEY", None)
deepgram_voice: Optional[str] = os.getenv("DEEPGRAM_VOICE", "aura-asteria-en")
deepgram_tts_base_url: Optional[str] = os.getenv(
"DEEPGRAM_TTS_BASE_URL", "https://api.deepgram.com/v1/speak")
deepgram_stt_base_url: Optional[str] = os.getenv(
"DEEPGRAM_STT_BASE_URL", "https://api.deepgram.com/v1/speak")
openai_api_key: Optional[str] = os.getenv("OPENAI_API_KEY", None),
openai_model: Optional[str] = os.getenv("OPENAI_MODEL", None),
openai_base_url: Optional[str] = os.getenv("OPENAI_BASE_URL", None)
vad_stop_secs: Optional[float] = os.getenv("VAD_STOP_SECS", 0.200)
async def main(settings: BotSettings):
async with aiohttp.ClientSession() as session:
transport = DailyTransport(
settings.room_url,
settings.room_token,
settings.bot_name,
DailyParams(
audio_out_enabled=True,
transcription_enabled=False,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(
stop_secs=settings.vad_stop_secs
)),
vad_audio_passthrough=True
)
)
stt = DeepgramSTTService(
name="STT",
api_key=settings.deepgram_api_key,
url=settings.deepgram_stt_base_url
)
tts = ClearableDeepgramTTSService(
name="Voice",
aiohttp_session=session,
api_key=settings.deepgram_api_key,
voice=settings.deepgram_voice,
**({'base_url': url} if (url := settings.deepgram_tts_base_url) else {})
)
llm = OpenAILLMService(
name="LLM",
api_key=settings.openai_api_key,
model=settings.openai_model,
base_url=settings.openai_base_url,
)
messages = [
{
"role": "system",
"content": settings.prompt,
},
]
avt = AudioVolumeTimer()
tl = TranscriptionTimingLogger(avt)
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Transport user input
avt, # Audio volume timer
stt, # Speech-to-text
tl, # Transcription timing logger
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out, # Assistant spoken responses
])
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
report_only_initial_ttfb=True
))
# When the participant leaves, we exit the bot.
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
# When the first participant joins, the bot should introduce itself.
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
# Provide some air whilst tracks subscribe
time.sleep(2)
messages.append(
{
"role": "system",
"content": "Briefly introduce yourself by saying 'hello, I'm FastBot, how can I help you today?'"})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Pipecat Bot")
parser.add_argument("-s", "--settings", type=str, required=True, help="Pipecat bot settings")
args, unknown = parser.parse_known_args()
try:
settings = BotSettings.model_validate_json(args.settings)
asyncio.run(main(settings))
except ValidationError as e:
print(e)

View File

@@ -0,0 +1,164 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
import argparse
import subprocess
from pydantic import BaseModel, ValidationError
from typing import Optional
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomObject, DailyRoomProperties, DailyRoomParams
from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from bot import BotSettings
from dotenv import load_dotenv
load_dotenv(override=True)
# ------------ Configuration ------------ #
MAX_SESSION_TIME = 5 * 60 # 5 minutes
REQUIRED_ENV_VARS = ['DAILY_API_URL', 'DAILY_API_KEY', 'DEEPGRAM_API_KEY']
daily_rest_helper = DailyRESTHelper(
os.getenv("DAILY_API_KEY", ""),
os.getenv("DAILY_API_URL", 'https://api.daily.co/v1'))
class RunnerSettings(BaseModel):
prompt: Optional[
str] = "You are a fast, low-latency chatbot. Your goal is to demonstrate voice-driven AI capabilities at human-like speeds. When introducing yourself briefly mention your goal is to showcase speed and conversational flow. The technology powering you is Daily for transport, Cerebrium for GPU hosting, Llama 3 (8-B version) LLM, and Deepgram for speech-to-text and text-to-speech. You are hosted on the east coast of the United States. Respond to what the user said in a creative and helpful way, but keep responses short and legible. Ensure responses contain only words. Check again that you have not included special characters other than '?' or '!'."
deepgram_voice: Optional[str] = os.getenv("DEEPGRAM_VOICE")
openai_model: Optional[str] = os.getenv("OPENAI_MODEL", "gpt-4o")
openai_api_key: Optional[str] = os.getenv("OPENAI_API_KEY")
test: Optional[bool] = None
# ----------------- API ----------------- #
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"]
)
# ----------------- Main ----------------- #
@app.post("/start_bot")
async def start_bot(request: Request) -> JSONResponse:
runner_settings = RunnerSettings()
try:
request_body = await request.body()
if len(request_body) > 0:
runner_settings = RunnerSettings.model_validate_json(request_body)
except ValidationError as e:
raise HTTPException(
status_code=400,
detail=f"Invalid request: {e}")
except Exception as e:
# If no data in request, pass
pass
# Is this a webhook creation request?
if runner_settings.test is not None:
return JSONResponse({"test": True})
# Use specified room URL, or create a new one if not specified
room_url = os.getenv("DAILY_SAMPLE_ROOM_URL", "")
if not room_url:
params = DailyRoomParams(
properties=DailyRoomProperties()
)
try:
room: DailyRoomObject = daily_rest_helper.create_room(params=params)
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Unable to provision room {e}")
else:
# Check passed room URL exists, we should assume that it already has a sip set up
try:
room: DailyRoomObject = daily_rest_helper.get_room_from_url(room_url)
except Exception:
raise HTTPException(
status_code=500, detail=f"Room not found: {room_url}")
# Give the agent a token to join the session
token = daily_rest_helper.get_token(room.url, MAX_SESSION_TIME)
if not room or not token:
raise HTTPException(
status_code=500, detail=f"Failed to get token for room: {room_url}")
# Spawn a new agent, and join the user session
try:
bot_settings = BotSettings(
room_url=room.url,
room_token=token,
prompt=runner_settings.prompt,
deepgram_voice=runner_settings.deepgram_voice,
openai_model=runner_settings.openai_model,
openai_api_key=runner_settings.openai_api_key,
)
bot_settings_str = bot_settings.model_dump_json(exclude_none=True)
subprocess.Popen(
[f"python3 -m bot -s '{bot_settings_str}'"],
shell=True,
bufsize=1,
cwd=os.path.dirname(os.path.abspath(__file__)))
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to start subprocess: {e}")
# Grab a token for the user to join with
user_token = daily_rest_helper.get_token(room.url, MAX_SESSION_TIME)
return JSONResponse({
"room_url": room.url,
"token": user_token,
})
if __name__ == "__main__":
# Check environment variables
for env_var in REQUIRED_ENV_VARS:
if env_var not in os.environ:
raise Exception(f"Missing environment variable: {env_var}.")
parser = argparse.ArgumentParser(description="Pipecat Bot Runner")
parser.add_argument("--host", type=str,
default=os.getenv("HOST", "0.0.0.0"), help="Host address")
parser.add_argument("--port", type=int,
default=os.getenv("PORT", 7860), help="Port number")
parser.add_argument("--reload", action="store_true",
default=True, help="Reload code on change")
config = parser.parse_args()
try:
import uvicorn
uvicorn.run(
"bot_runner:app",
host=config.host,
port=config.port,
reload=config.reload
)
except KeyboardInterrupt:
print("Pipecat runner shutting down...")

View File

@@ -0,0 +1,12 @@
DAILY_SAMPLE_ROOM_URL= #optional: use the same room each time, or create a new one if unset
DAILY_API_KEY=
DAILY_API_URL=
DEEPGRAM_API_KEY=
DEEPGRAM_VOICE=
DEEPGRAM_STT_URL=
DEEPGRAM_TTS_BASE_URL=
OPENAI_API_KEY=
OPENAI_MODEL=
OPENAI_BASE_URL=

View File

@@ -0,0 +1,267 @@
from loguru import logger
import asyncio
import math
import struct
import time
from dataclasses import dataclass, field
from typing import List
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.frames.frames import (
Frame,
AudioRawFrame,
InterimTranscriptionFrame,
TranscriptionFrame,
TextFrame,
StartInterruptionFrame,
LLMFullResponseStartFrame,
TTSStoppedFrame,
MetricsFrame
)
from pipecat.vad.vad_analyzer import VADAnalyzer, VADState
from pipecat.services.deepgram import DeepgramTTSService
from pipecat.services.openai import OpenAILLMContext, OpenAILLMContextFrame
class GreedyLLMAggregator(FrameProcessor):
def __init__(self, context: OpenAILLMContext = None, **kwargs):
super().__init__(**kwargs)
self.context: OpenAILLMContext = context if context else OpenAILLMContext()
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
logger.debug(f"{frame}")
try:
if isinstance(frame, InterimTranscriptionFrame):
return
if isinstance(frame, TranscriptionFrame):
# append transcribed text to last "user" frame
if self.context.messages and self.context.messages[-1]["role"] == "user":
last_frame = self.context.messages.pop()
else:
last_frame = {"role": "user", "content": ""}
last_frame["content"] += " " + frame.text
self.context.messages.append(last_frame)
oai_context_frame = OpenAILLMContextFrame(context=self.context)
logger.debug(f"pushing frame {oai_context_frame}")
await self.push_frame(oai_context_frame)
return
await self.push_frame(frame, direction)
except Exception as e:
logger.debug(f"error: {e}")
class ClearableDeepgramTTSService(DeepgramTTSService):
def __init___(self, **kwargs):
super().__init(**kwargs)
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, StartInterruptionFrame):
self._current_sentence = ""
@dataclass
class BufferedSentence:
audio_frames: List[AudioRawFrame] = field(default_factory=list)
text_frame: TextFrame = None
class VADGate(FrameProcessor):
def __init__(
self,
vad_analyzer: VADAnalyzer = None,
context: OpenAILLMContext = None,
**kwargs):
super().__init__(**kwargs)
self.vad_analyzer = vad_analyzer
self.context = context
self._audio_pusher_task = None
self._expect_text_frame_next = False
self._sentences: List[BufferedSentence] = []
# queue output from tts one sentence at a time. associate a buffer of audio frames with the content of
# each text frame.
#
# start a coroutine to service the queue and send sentences down the pipeline when possible.
# 1. do not send anything when we are not in VADState.QUIET
# 2. if we are in VADState.QUIET, send a sentence, estimate how long it will take for that sentence
# to output, sleep until it's time to send another sentence
# 3. each time we send a sentence, append it to the conversation context
# 3. when the sentence buffer becomes empty, cancel the coroutine
# 4. if we get a new LLMFullResponse, treat that as a cancellation, too
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
try:
# A TTSService will emit a series of AudioRawFrame objects, then a TTSStoppedFrame,
# then a TextFrame.
if self._expect_text_frame_next:
self._expect_text_frame_next = False
if isinstance(frame, TextFrame):
self._sentences[-1].text_frame = frame
else:
logger.debug(f"expected a text frame, but received {frame}")
await self.push_frame(frame, direction)
return
else:
if isinstance(frame, TextFrame):
logger.error(f"XXXXXXXXXXXXXXXXXXX received a text frame, wasn't expecting it.")
if isinstance(frame, AudioRawFrame):
# if our buffer is empty or has a "finished" sentence at the end,
# then we need to start buffering a new sentence
if not self._sentences or self._sentences[-1].text_frame:
self._sentences.append(BufferedSentence())
self._sentences[-1].audio_frames.append(frame)
await self.maybe_start_audio_pusher_task()
return
if isinstance(frame, TTSStoppedFrame):
self._expect_text_frame_next = True
await self.push_frame(frame, direction)
return
# There are two ways we can be interrupted. During greedy inference, a new
# LLM response can start. Or, during playout, we can get a traditional
# user interruption frame.
if (isinstance(frame, LLMFullResponseStartFrame) or
isinstance(frame, StartInterruptionFrame)):
logger.debug(f"{frame} - Handle interruption in VADGate")
self._sentences = []
if self._audio_pusher_task:
self._audio_pusher_task.cancel()
self._audio_pusher_task = None
await self.push_frame(frame, direction)
return
await self.push_frame(frame, direction)
except Exception as e:
logger.debug(f"error: {e}")
async def maybe_start_audio_pusher_task(self):
try:
if self._audio_pusher_task:
return
self._audio_pusher_task = self.get_event_loop().create_task(self.push_audio())
except Exception as e:
logger.debug(f"Exception {e}")
async def push_audio(self):
try:
while True:
if not self._sentences:
await asyncio.sleep(0.01)
continue
if self.vad_analyzer._vad_state != VADState.QUIET:
await asyncio.sleep(0.01)
continue
# we only want to push completed sentence buffers
if not self._sentences[0].text_frame:
await asyncio.sleep(0.01)
continue
s = self._sentences.pop(0)
if not s.audio_frames:
continue
sample_rate = s.audio_frames[0].sample_rate
duration = 0
logger.debug(f"Pushing {len(s.audio_frames)} audio frames")
for frame in s.audio_frames:
await self.push_frame(frame)
# assume linear16 encoding (2 bytes per sample). todo: add some more
# metadata to AudioRawFrame, maybe
duration += (len(frame.audio) / 2 / frame.num_channels) / sample_rate
await asyncio.sleep(duration - 20 / 1000)
if self.context:
logger.debug(f"Appending assistant message to context: [{s.text_frame.text}]")
self.context.messages.append(
{"role": "assistant", "content": s.text_frame.text}
)
await self.push_frame(s.text_frame)
except Exception as e:
logger.debug(f"Exception {e}")
class TranscriptionTimingLogger(FrameProcessor):
def __init__(self, avt):
super().__init__()
self.name = "Transcription"
self._avt = avt
async def process_frame(self, frame: Frame, direction: FrameDirection):
try:
await super().process_frame(frame, direction)
if isinstance(frame, TranscriptionFrame):
elapsed = time.time() - self._avt.last_transition_ts
logger.debug(f"Transcription TTF: {elapsed}")
await self.push_frame(MetricsFrame(ttfb={self.name: elapsed}))
await self.push_frame(frame, direction)
except Exception as e:
logger.debug(f"Exception {e}")
class AudioVolumeTimer(FrameProcessor):
def __init__(self):
super().__init__()
self.last_transition_ts = 0
self._prev_volume = -80
self._speech_volume_threshold = -50
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, AudioRawFrame):
volume = self.calculate_volume(frame)
# print(f"Audio volume: {volume:.2f} dB")
if (volume >= self._speech_volume_threshold and
self._prev_volume < self._speech_volume_threshold):
# logger.debug("transition above speech volume threshold")
self.last_transition_ts = time.time()
elif (volume < self._speech_volume_threshold and
self._prev_volume >= self._speech_volume_threshold):
# logger.debug("transition below non-speech volume threshold")
self.last_transition_ts = time.time()
self._prev_volume = volume
await self.push_frame(frame, direction)
def calculate_volume(self, frame: AudioRawFrame) -> float:
if frame.num_channels != 1:
raise ValueError(f"Expected 1 channel, got {frame.num_channels}")
# Unpack audio data into 16-bit integers
fmt = f"{len(frame.audio) // 2}h"
audio_samples = struct.unpack(fmt, frame.audio)
# Calculate RMS
sum_squares = sum(sample**2 for sample in audio_samples)
rms = math.sqrt(sum_squares / len(audio_samples))
# Convert RMS to decibels (dB)
# Reference: maximum value for 16-bit audio is 32767
if rms > 0:
db = 20 * math.log10(rms / 32767)
else:
db = -96 # Minimum value (almost silent)
return db

View File

@@ -0,0 +1,6 @@
pipecat-ai[daily,openai,silero,deepgram]
fastapi
uvicorn
requests
python-dotenv
loguru

View File

@@ -5,7 +5,6 @@
#
import asyncio
import aiohttp
import os
import sys
@@ -33,62 +32,61 @@ logger.add(sys.stderr, level="DEBUG")
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=44100,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=44100,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_name="British Lady",
output_format="pcm_44100"
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_name="British Lady",
output_format="pcm_44100"
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
runner = PipelineRunner()
await runner.run(task)
await runner.run(task)
if __name__ == "__main__":

View File

@@ -5,7 +5,6 @@
#
import asyncio
import aiohttp
import os
import sys
@@ -19,7 +18,6 @@ from pipecat.services.playht import PlayHTTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from pipecat.processors.logger import FrameLogger
from runner import configure
@@ -33,62 +31,61 @@ logger.add(sys.stderr, level="DEBUG")
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=16000,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=16000,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
)
tts = PlayHTTTSService(
user_id=os.getenv("PLAYHT_USER_ID"),
api_key=os.getenv("PLAYHT_API_KEY"),
voice_url="s3://voice-cloning-zero-shot/801a663f-efd0-4254-98d0-5c175514c3e8/jennifer/manifest.json",
)
tts = PlayHTTTSService(
user_id=os.getenv("PLAYHT_USER_ID"),
api_key=os.getenv("PLAYHT_API_KEY"),
voice_url="s3://voice-cloning-zero-shot/801a663f-efd0-4254-98d0-5c175514c3e8/jennifer/manifest.json",
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
runner = PipelineRunner()
await runner.run(task)
await runner.run(task)
if __name__ == "__main__":

View File

@@ -1,95 +0,0 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import aiohttp
import os
import sys
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
from pipecat.services.azure import AzureTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=16000,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
)
tts = AzureTTSService(
api_key=os.getenv("AZURE_SPEECH_API_KEY"),
region=os.getenv("AZURE_SPEECH_REGION"),
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -0,0 +1,100 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
from pipecat.services.azure import AzureLLMService, AzureSTTService, AzureTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main(room_url: str, token):
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=16000,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
)
)
stt = AzureSTTService(
api_key=os.getenv("AZURE_SPEECH_API_KEY"),
region=os.getenv("AZURE_SPEECH_REGION"),
)
tts = AzureTTSService(
api_key=os.getenv("AZURE_SPEECH_API_KEY"),
region=os.getenv("AZURE_SPEECH_REGION"),
)
llm = AzureLLMService(
api_key=os.getenv("AZURE_CHATGPT_API_KEY"),
endpoint=os.getenv("AZURE_CHATGPT_ENDPOINT"),
model=os.getenv("AZURE_CHATGPT_MODEL"),
)
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Transport user input
stt, # STT
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -5,7 +5,6 @@
#
import asyncio
import aiohttp
import os
import sys
@@ -32,61 +31,60 @@ logger.add(sys.stderr, level="DEBUG")
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=24000,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=24000,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
)
tts = OpenAITTSService(
api_key=os.getenv("OPENAI_API_KEY"),
voice="alloy"
)
tts = OpenAITTSService(
api_key=os.getenv("OPENAI_API_KEY"),
voice="alloy"
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
runner = PipelineRunner()
await runner.run(task)
await runner.run(task)
if __name__ == "__main__":

161
examples/twilio-chatbot/.gitignore vendored Normal file
View File

@@ -0,0 +1,161 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
runpod.toml

View File

@@ -0,0 +1,20 @@
# Use an official Python runtime as a parent image
FROM python:3.10-bullseye
# Set the working directory in the container
WORKDIR /twilio-chatbot
# Copy the requirements file into the container
COPY requirements.txt .
# Install any needed packages specified in requirements.txt
RUN pip install --no-cache-dir -r requirements.txt
# Copy the current directory contents into the container
COPY . .
# Expose the desired port
EXPOSE 8765
# Run the application
CMD ["uvicorn", "server:app", "--host", "0.0.0.0", "--port", "8765"]

View File

@@ -0,0 +1,82 @@
# Twilio Chatbot
This project is a FastAPI-based chatbot that integrates with Twilio to handle WebSocket connections and provide real-time communication. The project includes endpoints for starting a call and handling WebSocket connections.
## Table of Contents
- [Features](#features)
- [Requirements](#requirements)
- [Installation](#installation)
- [Configure Twilio URLs](#configure-twilio-urls)
- [Running the Application](#running-the-application)
- [Usage](#usage)
## Features
- **FastAPI**: A modern, fast (high-performance), web framework for building APIs with Python 3.6+.
- **WebSocket Support**: Real-time communication using WebSockets.
- **CORS Middleware**: Allowing cross-origin requests for testing.
- **Dockerized**: Easily deployable using Docker.
## Requirements
- Python 3.10
- Docker (for containerized deployment)
- ngrok (for tunneling)
- Twilio Account
## Installation
1. **Set up a virtual environment** (optional but recommended):
```sh
python -m venv venv
source venv/bin/activate # On Windows, use `venv\Scripts\activate`
```
2. **Install dependencies**:
```sh
pip install -r requirements.txt
```
3. **Create .env**:
create .env based on env.example
4. **Install ngrok**:
Follow the instructions on the [ngrok website](https://ngrok.com/download) to download and install ngrok.
## Configure Twilio URLs
1. **Update the Twilio Webhook**:
Copy the ngrok URL and update your Twilio phone number webhook URL to `http://<ngrok_url>/start_call`.
2. **Update the streams.xml**:
Copy the ngrok URL and update templates/streams.xml with `wss://<ngrok_url>/ws`.
## Running the Application
### Using Python
1. **Run the FastAPI application**:
```sh
python server.py
```
2. **Start ngrok**:
In a new terminal, start ngrok to tunnel the local server:
```sh
ngrok http 8765
```
### Using Docker
1. **Build the Docker image**:
```sh
docker build -t twilio-chatbot .
```
2. **Run the Docker container**:
```sh
docker run -it --rm -p 8765:8765 twilio-chatbot
```
## Usage
To start a call, simply make a call to your Twilio phone number. The webhook URL will direct the call to your FastAPI application, which will handle it accordingly.

View File

@@ -0,0 +1,88 @@
import aiohttp
import os
import sys
from pipecat.frames.frames import EndFrame, LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator,
LLMUserResponseAggregator
)
from pipecat.services.openai import OpenAILLMService
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.transports.network.fastapi_websocket import FastAPIWebsocketTransport, FastAPIWebsocketParams
from pipecat.vad.silero import SileroVADAnalyzer
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def run_bot(websocket_client):
async with aiohttp.ClientSession() as session:
transport = FastAPIWebsocketTransport(
websocket=websocket_client,
params=FastAPIWebsocketParams(
audio_out_enabled=True,
add_wav_header=False,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True
)
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
stt = DeepgramSTTService(api_key=os.getenv('DEEPGRAM_API_KEY'))
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)
messages = [
{
"role": "system",
"content": "You are a helpful LLM in an audio call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Websocket input from client
stt, # Speech-To-Text
tma_in, # User responses
llm, # LLM
tts, # Text-To-Speech
transport.output(), # Websocket output to client
tma_out # LLM responses
])
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
await task.queue_frames([EndFrame()])
runner = PipelineRunner(handle_sigint=False)
await runner.run(task)

View File

@@ -0,0 +1,4 @@
OPENAI_API_KEY=
DEEPGRAM_API_KEY=
ELEVENLABS_API_KEY=
ELEVENLABS_VOICE_ID=

View File

@@ -0,0 +1,5 @@
pipecat-ai[daily,openai,silero,deepgram]
fastapi
uvicorn
python-dotenv
loguru

View File

@@ -0,0 +1,34 @@
import uvicorn
from fastapi import FastAPI, WebSocket
from fastapi.middleware.cors import CORSMiddleware
from starlette.responses import HTMLResponse
from bot import run_bot
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Allow all origins for testing
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.post('/start_call')
async def start_call():
print("POST TwiML")
return HTMLResponse(content=open("templates/streams.xml").read(), media_type="application/xml")
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
print("WebSocket connection accepted")
await run_bot(websocket)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8765)

View File

@@ -0,0 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Connect>
<Stream url="wss://<your server url>/ws"></Stream>
</Connect>
<Pause length="40"/>
</Response>

View File

@@ -26,6 +26,8 @@ anyio==4.4.0
# anthropic
# httpx
# openai
# starlette
# watchfiles
async-timeout==4.0.3
# via
# aiohttp
@@ -54,12 +56,15 @@ cffi==1.16.0
charset-normalizer==3.3.2
# via requests
click==8.1.7
# via flask
# via
# flask
# typer
# uvicorn
coloredlogs==15.0.1
# via onnxruntime
ctranslate2==4.3.1
# via faster-whisper
daily-python==0.9.1
daily-python==0.10.0
# via pipecat-ai (pyproject.toml)
dataclasses-json==0.6.7
# via
@@ -71,17 +76,25 @@ distro==1.9.0
# via
# anthropic
# openai
dnspython==2.6.1
# via email-validator
einops==0.8.0
# via pipecat-ai (pyproject.toml)
email-validator==2.2.0
# via fastapi
exceptiongroup==1.2.1
# via
# anyio
# pytest
fal-client==0.4.0
# via pipecat-ai (pyproject.toml)
fastapi==0.111.0
# via pipecat-ai (pyproject.toml)
fastapi-cli==0.0.4
# via fastapi
faster-whisper==1.0.2
# via pipecat-ai (pyproject.toml)
filelock==3.15.1
filelock==3.15.3
# via
# huggingface-hub
# pyht
@@ -113,7 +126,7 @@ google-api-core[grpc]==2.19.0
# google-ai-generativelanguage
# google-api-python-client
# google-generativeai
google-api-python-client==2.133.0
google-api-python-client==2.134.0
# via google-generativeai
google-auth==2.30.0
# via
@@ -140,24 +153,29 @@ grpcio==1.64.1
grpcio-status==1.62.2
# via google-api-core
h11==0.14.0
# via httpcore
# via
# httpcore
# uvicorn
httpcore==1.0.5
# via httpx
httplib2==0.22.0
# via
# google-api-python-client
# google-auth-httplib2
httptools==0.6.1
# via uvicorn
httpx==0.27.0
# via
# anthropic
# cartesia
# deepgram-sdk
# fal-client
# fastapi
# openai
# openpipe
httpx-sse==0.4.0
# via fal-client
huggingface-hub==0.23.3
huggingface-hub==0.23.4
# via
# faster-whisper
# timm
@@ -168,6 +186,7 @@ humanfriendly==10.0
idna==3.7
# via
# anyio
# email-validator
# httpx
# requests
# yarl
@@ -177,41 +196,46 @@ itsdangerous==2.2.0
# via flask
jinja2==3.1.4
# via
# fastapi
# flask
# torch
jsonpatch==1.33
# via langchain-core
jsonpointer==3.0.0
# via jsonpatch
langchain==0.2.3
langchain==0.2.5
# via
# langchain-community
# pipecat-ai (pyproject.toml)
langchain-community==0.2.4
langchain-community==0.2.5
# via pipecat-ai (pyproject.toml)
langchain-core==0.2.5
langchain-core==0.2.9
# via
# langchain
# langchain-community
# langchain-openai
# langchain-text-splitters
langchain-openai==0.1.8
langchain-openai==0.1.9
# via pipecat-ai (pyproject.toml)
langchain-text-splitters==0.2.1
# via langchain
langsmith==0.1.77
langsmith==0.1.81
# via
# langchain
# langchain-community
# langchain-core
loguru==0.7.2
# via pipecat-ai (pyproject.toml)
markdown-it-py==3.0.0
# via rich
markupsafe==2.1.5
# via
# jinja2
# werkzeug
marshmallow==3.21.3
# via dataclasses-json
mdurl==0.1.2
# via markdown-it-py
mpmath==1.3.0
# via sympy
multidict==6.0.5
@@ -273,9 +297,11 @@ openai==1.26.0
# pipecat-ai (pyproject.toml)
openpipe==4.14.0
# via pipecat-ai (pyproject.toml)
orjson==3.10.4
# via langsmith
packaging==23.2
orjson==3.10.5
# via
# fastapi
# langsmith
packaging==24.1
# via
# huggingface-hub
# langchain-core
@@ -289,7 +315,7 @@ pillow==10.3.0
# torchvision
pluggy==1.5.0
# via pytest
proto-plus==1.23.0
proto-plus==1.24.0
# via
# google-ai-generativelanguage
# google-api-core
@@ -317,6 +343,7 @@ pycparser==2.22
pydantic==2.7.4
# via
# anthropic
# fastapi
# google-generativeai
# langchain
# langchain-core
@@ -324,6 +351,8 @@ pydantic==2.7.4
# openai
pydantic-core==2.18.4
# via pydantic
pygments==2.18.0
# via rich
pyht==0.0.28
# via pipecat-ai (pyproject.toml)
pyloudnorm==0.1.1
@@ -337,7 +366,11 @@ pytest-asyncio==0.23.7
python-dateutil==2.9.0.post0
# via openpipe
python-dotenv==1.0.1
# via pipecat-ai (pyproject.toml)
# via
# pipecat-ai (pyproject.toml)
# uvicorn
python-multipart==0.0.9
# via fastapi
pyyaml==6.0.1
# via
# ctranslate2
@@ -347,6 +380,7 @@ pyyaml==6.0.1
# langchain-core
# timm
# transformers
# uvicorn
regex==2024.5.15
# via
# tiktoken
@@ -362,6 +396,8 @@ requests==2.32.3
# pyht
# tiktoken
# transformers
rich==13.7.1
# via typer
rsa==4.9
# via google-auth
safetensors==0.4.3
@@ -370,6 +406,8 @@ safetensors==0.4.3
# transformers
scipy==1.13.1
# via pyloudnorm
shellingham==1.5.4
# via typer
six==1.16.0
# via python-dateutil
sniffio==1.3.1
@@ -380,15 +418,17 @@ sniffio==1.3.1
# openai
sounddevice==0.4.7
# via pipecat-ai (pyproject.toml)
sqlalchemy==2.0.30
sqlalchemy==2.0.31
# via
# langchain
# langchain-community
starlette==0.37.2
# via fastapi
sympy==1.12.1
# via
# onnxruntime
# torch
tenacity==8.3.0
tenacity==8.4.1
# via
# langchain
# langchain-community
@@ -424,11 +464,14 @@ transformers==4.40.2
# via pipecat-ai (pyproject.toml)
triton==2.3.1
# via torch
typer==0.12.3
# via fastapi-cli
typing-extensions==4.12.2
# via
# anthropic
# anyio
# deepgram-sdk
# fastapi
# google-generativeai
# huggingface-hub
# openai
@@ -437,20 +480,31 @@ typing-extensions==4.12.2
# pydantic-core
# sqlalchemy
# torch
# typer
# typing-inspect
# uvicorn
typing-inspect==0.9.0
# via dataclasses-json
ujson==5.10.0
# via fastapi
uritemplate==4.1.1
# via google-api-python-client
urllib3==2.2.1
urllib3==2.2.2
# via requests
uvicorn[standard]==0.30.1
# via fastapi
uvloop==0.19.0
# via uvicorn
verboselogs==1.7
# via deepgram-sdk
watchfiles==0.22.0
# via uvicorn
websockets==12.0
# via
# cartesia
# deepgram-sdk
# pipecat-ai (pyproject.toml)
# uvicorn
werkzeug==3.0.3
# via flask
yarl==1.9.4

View File

@@ -1,5 +1,5 @@
#
# This file is autogenerated by pip-compile with Python 3.10
# This file is autogenerated by pip-compile with Python 3.12
# by the following command:
#
# pip-compile --all-extras pyproject.toml
@@ -26,10 +26,8 @@ anyio==4.4.0
# anthropic
# httpx
# openai
async-timeout==4.0.3
# via
# aiohttp
# langchain
# starlette
# watchfiles
attrs==23.2.0
# via
# aiohttp
@@ -54,12 +52,15 @@ cffi==1.16.0
charset-normalizer==3.3.2
# via requests
click==8.1.7
# via flask
# via
# flask
# typer
# uvicorn
coloredlogs==15.0.1
# via onnxruntime
ctranslate2==4.3.1
# via faster-whisper
daily-python==0.9.1
daily-python==0.10.0
# via pipecat-ai (pyproject.toml)
dataclasses-json==0.6.7
# via
@@ -71,17 +72,21 @@ distro==1.9.0
# via
# anthropic
# openai
dnspython==2.6.1
# via email-validator
einops==0.8.0
# via pipecat-ai (pyproject.toml)
exceptiongroup==1.2.1
# via
# anyio
# pytest
email-validator==2.2.0
# via fastapi
fal-client==0.4.0
# via pipecat-ai (pyproject.toml)
fastapi==0.111.0
# via pipecat-ai (pyproject.toml)
fastapi-cli==0.0.4
# via fastapi
faster-whisper==1.0.2
# via pipecat-ai (pyproject.toml)
filelock==3.15.1
filelock==3.15.3
# via
# huggingface-hub
# pyht
@@ -112,7 +117,7 @@ google-api-core[grpc]==2.19.0
# google-ai-generativelanguage
# google-api-python-client
# google-generativeai
google-api-python-client==2.133.0
google-api-python-client==2.134.0
# via google-generativeai
google-auth==2.30.0
# via
@@ -137,24 +142,29 @@ grpcio==1.64.1
grpcio-status==1.62.2
# via google-api-core
h11==0.14.0
# via httpcore
# via
# httpcore
# uvicorn
httpcore==1.0.5
# via httpx
httplib2==0.22.0
# via
# google-api-python-client
# google-auth-httplib2
httptools==0.6.1
# via uvicorn
httpx==0.27.0
# via
# anthropic
# cartesia
# deepgram-sdk
# fal-client
# fastapi
# openai
# openpipe
httpx-sse==0.4.0
# via fal-client
huggingface-hub==0.23.3
huggingface-hub==0.23.4
# via
# faster-whisper
# timm
@@ -165,6 +175,7 @@ humanfriendly==10.0
idna==3.7
# via
# anyio
# email-validator
# httpx
# requests
# yarl
@@ -174,41 +185,46 @@ itsdangerous==2.2.0
# via flask
jinja2==3.1.4
# via
# fastapi
# flask
# torch
jsonpatch==1.33
# via langchain-core
jsonpointer==3.0.0
# via jsonpatch
langchain==0.2.3
langchain==0.2.5
# via
# langchain-community
# pipecat-ai (pyproject.toml)
langchain-community==0.2.4
langchain-community==0.2.5
# via pipecat-ai (pyproject.toml)
langchain-core==0.2.5
langchain-core==0.2.9
# via
# langchain
# langchain-community
# langchain-openai
# langchain-text-splitters
langchain-openai==0.1.8
langchain-openai==0.1.9
# via pipecat-ai (pyproject.toml)
langchain-text-splitters==0.2.1
# via langchain
langsmith==0.1.77
langsmith==0.1.81
# via
# langchain
# langchain-community
# langchain-core
loguru==0.7.2
# via pipecat-ai (pyproject.toml)
markdown-it-py==3.0.0
# via rich
markupsafe==2.1.5
# via
# jinja2
# werkzeug
marshmallow==3.21.3
# via dataclasses-json
mdurl==0.1.2
# via markdown-it-py
mpmath==1.3.0
# via sympy
multidict==6.0.5
@@ -239,9 +255,11 @@ openai==1.26.0
# pipecat-ai (pyproject.toml)
openpipe==4.14.0
# via pipecat-ai (pyproject.toml)
orjson==3.10.4
# via langsmith
packaging==23.2
orjson==3.10.5
# via
# fastapi
# langsmith
packaging==24.1
# via
# huggingface-hub
# langchain-core
@@ -255,7 +273,7 @@ pillow==10.3.0
# torchvision
pluggy==1.5.0
# via pytest
proto-plus==1.23.0
proto-plus==1.24.0
# via
# google-ai-generativelanguage
# google-api-core
@@ -283,6 +301,7 @@ pycparser==2.22
pydantic==2.7.4
# via
# anthropic
# fastapi
# google-generativeai
# langchain
# langchain-core
@@ -290,6 +309,8 @@ pydantic==2.7.4
# openai
pydantic-core==2.18.4
# via pydantic
pygments==2.18.0
# via rich
pyht==0.0.28
# via pipecat-ai (pyproject.toml)
pyloudnorm==0.1.1
@@ -303,7 +324,11 @@ pytest-asyncio==0.23.7
python-dateutil==2.9.0.post0
# via openpipe
python-dotenv==1.0.1
# via pipecat-ai (pyproject.toml)
# via
# pipecat-ai (pyproject.toml)
# uvicorn
python-multipart==0.0.9
# via fastapi
pyyaml==6.0.1
# via
# ctranslate2
@@ -313,6 +338,7 @@ pyyaml==6.0.1
# langchain-core
# timm
# transformers
# uvicorn
regex==2024.5.15
# via
# tiktoken
@@ -328,6 +354,8 @@ requests==2.32.3
# pyht
# tiktoken
# transformers
rich==13.7.1
# via typer
rsa==4.9
# via google-auth
safetensors==0.4.3
@@ -336,6 +364,8 @@ safetensors==0.4.3
# transformers
scipy==1.13.1
# via pyloudnorm
shellingham==1.5.4
# via typer
six==1.16.0
# via python-dateutil
sniffio==1.3.1
@@ -346,15 +376,17 @@ sniffio==1.3.1
# openai
sounddevice==0.4.7
# via pipecat-ai (pyproject.toml)
sqlalchemy==2.0.30
sqlalchemy==2.0.31
# via
# langchain
# langchain-community
starlette==0.37.2
# via fastapi
sympy==1.12.1
# via
# onnxruntime
# torch
tenacity==8.3.0
tenacity==8.4.1
# via
# langchain
# langchain-community
@@ -368,8 +400,6 @@ tokenizers==0.19.1
# anthropic
# faster-whisper
# transformers
tomli==2.0.1
# via pytest
torch==2.3.1
# via
# pipecat-ai (pyproject.toml)
@@ -388,11 +418,13 @@ tqdm==4.66.4
# transformers
transformers==4.40.2
# via pipecat-ai (pyproject.toml)
typer==0.12.3
# via fastapi-cli
typing-extensions==4.12.2
# via
# anthropic
# anyio
# deepgram-sdk
# fastapi
# google-generativeai
# huggingface-hub
# openai
@@ -401,20 +433,30 @@ typing-extensions==4.12.2
# pydantic-core
# sqlalchemy
# torch
# typer
# typing-inspect
typing-inspect==0.9.0
# via dataclasses-json
ujson==5.10.0
# via fastapi
uritemplate==4.1.1
# via google-api-python-client
urllib3==2.2.1
urllib3==2.2.2
# via requests
uvicorn[standard]==0.30.1
# via fastapi
uvloop==0.19.0
# via uvicorn
verboselogs==1.7
# via deepgram-sdk
watchfiles==0.22.0
# via uvicorn
websockets==12.0
# via
# cartesia
# deepgram-sdk
# pipecat-ai (pyproject.toml)
# uvicorn
werkzeug==3.0.3
# via flask
yarl==1.9.4

View File

@@ -37,7 +37,7 @@ Website = "https://pipecat.ai"
anthropic = [ "anthropic~=0.25.7" ]
azure = [ "azure-cognitiveservices-speech~=1.37.0" ]
cartesia = [ "numpy~=1.26.0", "sounddevice", "cartesia" ]
daily = [ "daily-python~=0.9.0" ]
daily = [ "daily-python~=0.10.0" ]
deepgram = [ "deepgram-sdk~=3.2.7" ]
examples = [ "python-dotenv~=1.0.0", "flask~=3.0.3", "flask_cors~=4.0.1" ]
fal = [ "fal-client~=0.4.0" ]
@@ -50,7 +50,7 @@ openai = [ "openai~=1.26.0" ]
openpipe = [ "openpipe~=4.14.0" ]
playht = [ "pyht~=0.0.28" ]
silero = [ "torch~=2.3.0", "torchaudio~=2.3.0" ]
websocket = [ "websockets~=12.0" ]
websocket = [ "websockets~=12.0", "fastapi~=0.111.0" ]
whisper = [ "faster-whisper~=1.0.2" ]
[tool.setuptools.packages.find]

View File

@@ -71,6 +71,8 @@ class PipelineTask:
await self._source.process_frame(CancelFrame(), FrameDirection.DOWNSTREAM)
self._process_down_task.cancel()
self._process_up_task.cancel()
await self._process_down_task
await self._process_up_task
async def run(self):
self._process_up_task = asyncio.create_task(self._process_up_queue())
@@ -122,6 +124,7 @@ class PipelineTask:
await self._pipeline.cleanup()
# We just enqueue None to terminate the task gracefully.
self._process_up_task.cancel()
await self._process_up_task
async def _process_up_queue(self):
while True:

View File

@@ -12,9 +12,9 @@ from pipecat.frames.frames import Frame
class FrameSerializer(ABC):
@abstractmethod
def serialize(self, frame: Frame) -> bytes:
def serialize(self, frame: Frame) -> str | bytes | None:
pass
@abstractmethod
def deserialize(self, data: bytes) -> Frame | None:
def deserialize(self, data: str | bytes) -> Frame | None:
pass

View File

@@ -26,7 +26,7 @@ class ProtobufFrameSerializer(FrameSerializer):
def __init__(self):
pass
def serialize(self, frame: Frame) -> bytes:
def serialize(self, frame: Frame) -> str | bytes | None:
proto_frame = frame_protos.Frame()
if type(frame) not in self.SERIALIZABLE_TYPES:
raise ValueError(
@@ -41,7 +41,7 @@ class ProtobufFrameSerializer(FrameSerializer):
result = proto_frame.SerializeToString()
return result
def deserialize(self, data: bytes) -> Frame | None:
def deserialize(self, data: str | bytes) -> Frame | None:
"""Returns a Frame object from a Frame protobuf. Used to convert frames
passed over the wire as protobufs to Frame objects used in pipelines
and frame processors.

View File

@@ -0,0 +1,55 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import base64
import json
from pipecat.frames.frames import AudioRawFrame, Frame
from pipecat.serializers.base_serializer import FrameSerializer
from pipecat.utils.audio import ulaw_8000_to_pcm_16000, pcm_16000_to_ulaw_8000
class TwilioFrameSerializer(FrameSerializer):
SERIALIZABLE_TYPES = {
AudioRawFrame: "audio",
}
def __init__(self):
self._sid = None
def serialize(self, frame: Frame) -> str | bytes | None:
if not isinstance(frame, AudioRawFrame):
return None
data = frame.audio
serialized_data = pcm_16000_to_ulaw_8000(data)
payload = base64.b64encode(serialized_data).decode("utf-8")
answer = {
"event": "media",
"streamSid": self._sid,
"media": {
"payload": payload
}
}
return json.dumps(answer)
def deserialize(self, data: str | bytes) -> Frame | None:
message = json.loads(data)
if not self._sid:
self._sid = message["streamSid"] if "streamSid" in message else None
if message["event"] != "media":
return None
else:
payload_base64 = message["media"]["payload"]
payload = base64.b64decode(payload_base64)
deserialized_data = ulaw_8000_to_pcm_16000(payload)
audio_frame = AudioRawFrame(audio=deserialized_data, num_channels=1, sample_rate=16000)
return audio_frame

View File

@@ -21,6 +21,7 @@ from pipecat.frames.frames import (
TTSStoppedFrame,
TextFrame,
VisionImageRawFrame,
LLMFullResponseEndFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.utils.audio import calculate_audio_volume
@@ -110,7 +111,9 @@ class TTSService(AIService):
text = frame.text
else:
self._current_sentence += frame.text
if self._current_sentence.strip().endswith((".", "?", "!")):
if self._current_sentence.strip().endswith(
(".", "?", "!")) and not self._current_sentence.strip().endswith(
("Mr,", "Mrs.", "Ms.", "Dr.")):
text = self._current_sentence.strip()
self._current_sentence = ""
@@ -134,6 +137,11 @@ class TTSService(AIService):
if self._current_sentence:
await self._push_tts_frames(self._current_sentence)
await self.push_frame(frame)
elif isinstance(frame, LLMFullResponseEndFrame):
if self._current_sentence:
await self._push_tts_frames(self._current_sentence.strip())
self._current_sentence = ""
await self.push_frame(frame)
else:
await self.push_frame(frame, direction)

View File

@@ -7,26 +7,30 @@
import aiohttp
import asyncio
import io
import time
from PIL import Image
from typing import AsyncGenerator
from openai import AsyncAzureOpenAI
from pipecat.frames.frames import AudioRawFrame, ErrorFrame, Frame, URLImageRawFrame
from pipecat.services.ai_services import TTSService, ImageGenService
from pipecat.frames.frames import AudioRawFrame, CancelFrame, EndFrame, ErrorFrame, Frame, StartFrame, SystemFrame, TranscriptionFrame, URLImageRawFrame
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_services import AIService, TTSService, ImageGenService
from pipecat.services.openai import BaseOpenAILLMService
from loguru import logger
# See .env.example for Azure configuration needed
try:
from openai import AsyncAzureOpenAI
from azure.cognitiveservices.speech import (
SpeechSynthesizer,
SpeechConfig,
SpeechRecognizer,
SpeechSynthesizer,
ResultReason,
CancellationReason,
)
from azure.cognitiveservices.speech.audio import AudioStreamFormat, PushAudioInputStream
from azure.cognitiveservices.speech.dialog import AudioConfig
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
@@ -34,14 +38,35 @@ except ModuleNotFoundError as e:
raise Exception(f"Missing module: {e}")
class AzureLLMService(BaseOpenAILLMService):
def __init__(
self,
*,
api_key: str,
endpoint: str,
model: str,
api_version: str = "2023-12-01-preview"):
# Initialize variables before calling parent __init__() because that
# will call create_client() and we need those values there.
self._endpoint = endpoint
self._api_version = api_version
super().__init__(api_key=api_key, model=model)
def create_client(self, api_key=None, base_url=None, **kwargs):
return AsyncAzureOpenAI(
api_key=api_key,
azure_endpoint=self._endpoint,
api_version=self._api_version,
)
class AzureTTSService(TTSService):
def __init__(self, *, api_key: str, region: str, voice="en-US-SaraNeural", **kwargs):
super().__init__(**kwargs)
self.speech_config = SpeechConfig(subscription=api_key, region=region)
self.speech_synthesizer = SpeechSynthesizer(
speech_config=self.speech_config, audio_config=None
)
speech_config = SpeechConfig(subscription=api_key, region=region)
self._speech_synthesizer = SpeechSynthesizer(speech_config=speech_config, audio_config=None)
self._voice = voice
def can_generate_metrics(self) -> bool:
@@ -62,7 +87,7 @@ class AzureTTSService(TTSService):
f"{text}"
"</prosody></mstts:express-as></voice></speak> ")
result = await asyncio.to_thread(self.speech_synthesizer.speak_ssml, (ssml))
result = await asyncio.to_thread(self._speech_synthesizer.speak_ssml, (ssml))
if result.reason == ResultReason.SynthesizingAudioCompleted:
await self.stop_ttfb_metrics()
@@ -75,26 +100,74 @@ class AzureTTSService(TTSService):
logger.error(f"{self} error: {cancellation_details.error_details}")
class AzureLLMService(BaseOpenAILLMService):
class AzureSTTService(AIService):
def __init__(
self,
*,
api_key: str,
endpoint: str,
model: str,
api_version: str = "2023-12-01-preview"):
# Initialize variables before calling parent __init__() because that
# will call create_client() and we need those values there.
self._endpoint = endpoint
self._api_version = api_version
super().__init__(api_key=api_key, model=model)
region: str,
language="en-US",
sample_rate=16000,
channels=1,
**kwargs):
super().__init__(**kwargs)
def create_client(self, api_key=None, base_url=None):
return AsyncAzureOpenAI(
api_key=api_key,
azure_endpoint=self._endpoint,
api_version=self._api_version,
)
speech_config = SpeechConfig(subscription=api_key, region=region)
speech_config.speech_recognition_language = language
stream_format = AudioStreamFormat(samples_per_second=sample_rate, channels=channels)
self._audio_stream = PushAudioInputStream(stream_format)
audio_config = AudioConfig(stream=self._audio_stream)
self._speech_recognizer = SpeechRecognizer(
speech_config=speech_config, audio_config=audio_config)
self._speech_recognizer.recognized.connect(self._on_handle_recognized)
self._create_push_task()
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, SystemFrame):
await self.push_frame(frame, direction)
elif isinstance(frame, AudioRawFrame):
self._audio_stream.write(frame.audio)
else:
await self._push_queue.put((frame, direction))
async def start(self, frame: StartFrame):
self._speech_recognizer.start_continuous_recognition_async()
async def stop(self, frame: EndFrame):
self._speech_recognizer.stop_continuous_recognition_async()
await self._push_queue.put((frame, FrameDirection.DOWNSTREAM))
await self._push_frame_task
async def cancel(self, frame: CancelFrame):
self._speech_recognizer.stop_continuous_recognition_async()
self._push_frame_task.cancel()
await self._push_frame_task
def _create_push_task(self):
self._push_queue = asyncio.Queue()
self._push_frame_task = self.get_event_loop().create_task(self._push_frame_task_handler())
async def _push_frame_task_handler(self):
running = True
while running:
try:
(frame, direction) = await self._push_queue.get()
await self.push_frame(frame, direction)
running = not isinstance(frame, EndFrame)
except asyncio.CancelledError:
break
def _on_handle_recognized(self, event):
if event.result.reason == ResultReason.RecognizedSpeech and len(event.result.text) > 0:
direction = FrameDirection.DOWNSTREAM
frame = TranscriptionFrame(event.result.text, "", int(time.time_ns() / 1000000))
asyncio.run_coroutine_threadsafe(
self._push_queue.put((frame, direction)), self.get_event_loop())
class AzureImageGenServiceREST(ImageGenService):

View File

@@ -89,6 +89,7 @@ class DeepgramTTSService(TTSService):
class DeepgramSTTService(AIService):
def __init__(self,
api_key: str,
url: str = "",
live_options: LiveOptions = LiveOptions(
encoding="linear16",
language="en-US",
@@ -104,7 +105,7 @@ class DeepgramSTTService(AIService):
self._live_options = live_options
self._client = DeepgramClient(
api_key, config=DeepgramClientOptions(options={"keepalive": "true"}))
api_key, config=DeepgramClientOptions(url=url, options={"keepalive": "true"}))
self._connection = self._client.listen.asynclive.v("1")
self._connection.on(LiveTranscriptionEvents.Transcript, self._on_message)
@@ -134,10 +135,11 @@ class DeepgramSTTService(AIService):
async def cancel(self, frame: CancelFrame):
await self._connection.finish()
self._push_frame_task.cancel()
await self._push_frame_task
def _create_push_task(self):
self._push_frame_task = self.get_event_loop().create_task(self._push_frame_task_handler())
self._push_queue = asyncio.Queue()
self._push_frame_task = self.get_event_loop().create_task(self._push_frame_task_handler())
async def _push_frame_task_handler(self):
running = True

View File

@@ -5,7 +5,6 @@
#
import asyncio
import queue
from concurrent.futures import ThreadPoolExecutor
@@ -33,8 +32,6 @@ class BaseInputTransport(FrameProcessor):
self._params = params
self._running = False
self._executor = ThreadPoolExecutor(max_workers=5)
# Create push frame task. This is the task that will push frames in
@@ -42,34 +39,21 @@ class BaseInputTransport(FrameProcessor):
self._create_push_task()
async def start(self, frame: StartFrame):
if self._running:
return
self._running = True
# Create audio input queue and thread if needed.
# Create audio input queue and task if needed.
if self._params.audio_in_enabled or self._params.vad_enabled:
self._audio_in_queue = queue.Queue()
self._audio_thread = self._loop.run_in_executor(
self._executor, self._audio_thread_handler)
self._audio_in_queue = asyncio.Queue()
self._audio_task = self.get_event_loop().create_task(self._audio_task_handler())
async def stop(self):
if not self._running:
return
# This will exit all threads.
self._running = False
# Wait for the threads to finish.
# Wait for the task to finish.
if self._params.audio_in_enabled or self._params.vad_enabled:
await self._audio_thread
self._push_frame_task.cancel()
self._audio_task.cancel()
await self._audio_task
def vad_analyzer(self) -> VADAnalyzer | None:
return self._params.vad_analyzer
def push_audio_frame(self, frame: AudioRawFrame):
async def push_audio_frame(self, frame: AudioRawFrame):
if self._params.audio_in_enabled or self._params.vad_enabled:
self._audio_in_queue.put_nowait(frame)
@@ -78,7 +62,8 @@ class BaseInputTransport(FrameProcessor):
#
async def cleanup(self):
pass
self._push_frame_task.cancel()
await self._push_frame_task
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
@@ -102,8 +87,8 @@ class BaseInputTransport(FrameProcessor):
def _create_push_task(self):
loop = self.get_event_loop()
self._push_frame_task = loop.create_task(self._push_frame_task_handler())
self._push_queue = asyncio.Queue()
self._push_frame_task = loop.create_task(self._push_frame_task_handler())
async def _internal_push_frame(
self,
@@ -129,6 +114,7 @@ class BaseInputTransport(FrameProcessor):
if isinstance(frame, UserStartedSpeakingFrame):
logger.debug("User started speaking")
self._push_frame_task.cancel()
await self._push_frame_task
self._create_push_task()
await self.push_frame(StartInterruptionFrame())
elif isinstance(frame, UserStoppedSpeakingFrame):
@@ -140,15 +126,16 @@ class BaseInputTransport(FrameProcessor):
# Audio input
#
def _vad_analyze(self, audio_frames: bytes) -> VADState:
async def _vad_analyze(self, audio_frames: bytes) -> VADState:
state = VADState.QUIET
vad_analyzer = self.vad_analyzer()
if vad_analyzer:
state = vad_analyzer.analyze_audio(audio_frames)
state = await self.get_event_loop().run_in_executor(
self._executor, vad_analyzer.analyze_audio, audio_frames)
return state
def _handle_vad(self, audio_frames: bytes, vad_state: VADState):
new_vad_state = self._vad_analyze(audio_frames)
async def _handle_vad(self, audio_frames: bytes, vad_state: VADState):
new_vad_state = await self._vad_analyze(audio_frames)
if new_vad_state != vad_state and new_vad_state != VADState.STARTING and new_vad_state != VADState.STOPPING:
frame = None
if new_vad_state == VADState.SPEAKING:
@@ -157,33 +144,29 @@ class BaseInputTransport(FrameProcessor):
frame = UserStoppedSpeakingFrame()
if frame:
future = asyncio.run_coroutine_threadsafe(
self._handle_interruptions(frame), self.get_event_loop())
future.result()
await self._handle_interruptions(frame)
vad_state = new_vad_state
return vad_state
def _audio_thread_handler(self):
async def _audio_task_handler(self):
vad_state: VADState = VADState.QUIET
while self._running:
while True:
try:
frame: AudioRawFrame = self._audio_in_queue.get(timeout=1)
frame: AudioRawFrame = await self._audio_in_queue.get()
audio_passthrough = True
# Check VAD and push event if necessary. We just care about
# changes from QUIET to SPEAKING and vice versa.
if self._params.vad_enabled:
vad_state = self._handle_vad(frame.audio, vad_state)
vad_state = await self._handle_vad(frame.audio, vad_state)
audio_passthrough = self._params.vad_audio_passthrough
# Push audio downstream if passthrough.
if audio_passthrough:
future = asyncio.run_coroutine_threadsafe(
self._internal_push_frame(frame), self._loop)
future.result()
except queue.Empty:
pass
await self._internal_push_frame(frame)
except asyncio.CancelledError:
break
except BaseException as e:
logger.error(f"{self} error reading audio frames: {e}")

View File

@@ -7,11 +7,6 @@
import asyncio
import itertools
import queue
import time
import threading
from concurrent.futures import ThreadPoolExecutor
from PIL import Image
from typing import List
@@ -42,67 +37,51 @@ class BaseOutputTransport(FrameProcessor):
self._params = params
self._running = False
self._executor = ThreadPoolExecutor(max_workers=5)
# These are the images that we should send to the camera at our desired
# framerate.
self._camera_images = None
# Create media threads queues.
if self._params.camera_out_enabled:
self._camera_out_queue = queue.Queue()
self._sink_queue = queue.Queue()
self._sink_thread = None
self._stopped_event = asyncio.Event()
self._is_interrupted = threading.Event()
# We will write 20ms audio at a time. If we receive long audio frames we
# will chunk them. This will help with interruption handling.
audio_bytes_10ms = int(self._params.audio_out_sample_rate / 100) * \
self._params.audio_out_channels * 2
self._audio_chunk_size = audio_bytes_10ms * 2
self._stopped_event = asyncio.Event()
# Create sink frame task. This is the task that will actually write
# audio or video frames. We write audio/video in a task so we can keep
# generating frames upstream while, for example, the audio is playing.
self._create_sink_task()
# Create push frame task. This is the task that will push frames in
# order. We also guarantee that all frames are pushed in the same task.
self._create_push_task()
async def start(self, frame: StartFrame):
if self._running:
return
self._running = True
loop = self.get_event_loop()
# Create queues and threads.
# Create media threads queues.
if self._params.camera_out_enabled:
self._camera_out_thread = loop.run_in_executor(
self._executor, self._camera_out_thread_handler)
self._sink_thread = loop.run_in_executor(self._executor, self._sink_thread_handler)
self._camera_out_queue = asyncio.Queue()
self._camera_out_task = self.get_event_loop().create_task(self._camera_out_task_handler())
async def stop(self):
if not self._running:
return
# This will exit all threads.
self._running = False
# Wait on the threads to finish.
if self._params.camera_out_enabled:
self._camera_out_task.cancel()
await self._camera_out_task
self._stopped_event.set()
def send_message(self, frame: TransportMessageFrame):
async def send_message(self, frame: TransportMessageFrame):
pass
def send_metrics(self, frame: MetricsFrame):
async def send_metrics(self, frame: MetricsFrame):
pass
def write_frame_to_camera(self, frame: ImageRawFrame):
async def write_frame_to_camera(self, frame: ImageRawFrame):
pass
def write_raw_audio_frames(self, frames: bytes):
async def write_raw_audio_frames(self, frames: bytes):
pass
#
@@ -110,12 +89,12 @@ class BaseOutputTransport(FrameProcessor):
#
async def cleanup(self):
# Wait on the threads to finish.
if self._params.camera_out_enabled:
await self._camera_out_thread
if self._sink_task:
self._sink_task.cancel()
await self._sink_task
if self._sink_thread:
await self._sink_thread
self._push_frame_task.cancel()
await self._push_frame_task
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
@@ -128,7 +107,7 @@ class BaseOutputTransport(FrameProcessor):
if isinstance(frame, StartFrame):
await self.start(frame)
await self.push_frame(frame, direction)
# EndFrame is managed in the queue handler.
# EndFrame is managed in the sink queue handler.
elif isinstance(frame, CancelFrame):
await self.stop()
await self.push_frame(frame, direction)
@@ -136,14 +115,14 @@ class BaseOutputTransport(FrameProcessor):
await self._handle_interruptions(frame)
await self.push_frame(frame, direction)
elif isinstance(frame, MetricsFrame):
self.send_metrics(frame)
await self.send_metrics(frame)
await self.push_frame(frame, direction)
elif isinstance(frame, SystemFrame):
await self.push_frame(frame, direction)
elif isinstance(frame, AudioRawFrame):
await self._handle_audio(frame)
else:
self._sink_queue.put_nowait(frame)
await self._sink_queue.put(frame)
# If we are finishing, wait here until we have stopped, otherwise we might
# close things too early upstream. We need this event because we don't
@@ -156,50 +135,51 @@ class BaseOutputTransport(FrameProcessor):
return
if isinstance(frame, StartInterruptionFrame):
self._is_interrupted.set()
# Stop sink task.
self._sink_task.cancel()
await self._sink_task
self._create_sink_task()
# Stop push task.
self._push_frame_task.cancel()
await self._push_frame_task
self._create_push_task()
elif isinstance(frame, StopInterruptionFrame):
self._is_interrupted.clear()
async def _handle_audio(self, frame: AudioRawFrame):
audio = frame.audio
for i in range(0, len(audio), self._audio_chunk_size):
chunk = AudioRawFrame(audio[i: i + self._audio_chunk_size],
sample_rate=frame.sample_rate, num_channels=frame.num_channels)
self._sink_queue.put_nowait(chunk)
await self._sink_queue.put(chunk)
def _sink_thread_handler(self):
def _create_sink_task(self):
loop = self.get_event_loop()
self._sink_queue = asyncio.Queue()
self._sink_task = loop.create_task(self._sink_task_handler())
async def _sink_task_handler(self):
# Audio accumlation buffer
buffer = bytearray()
while self._running:
while True:
try:
frame = self._sink_queue.get(timeout=1)
if not self._is_interrupted.is_set():
if isinstance(frame, AudioRawFrame) and self._params.audio_out_enabled:
buffer.extend(frame.audio)
buffer = self._maybe_send_audio(buffer)
elif isinstance(frame, ImageRawFrame) and self._params.camera_out_enabled:
self._set_camera_image(frame)
elif isinstance(frame, SpriteFrame) and self._params.camera_out_enabled:
self._set_camera_images(frame.images)
elif isinstance(frame, TransportMessageFrame):
self.send_message(frame)
else:
future = asyncio.run_coroutine_threadsafe(
self._internal_push_frame(frame), self.get_event_loop())
future.result()
frame = await self._sink_queue.get()
if isinstance(frame, AudioRawFrame) and self._params.audio_out_enabled:
buffer.extend(frame.audio)
buffer = await self._maybe_send_audio(buffer)
elif isinstance(frame, ImageRawFrame) and self._params.camera_out_enabled:
await self._set_camera_image(frame)
elif isinstance(frame, SpriteFrame) and self._params.camera_out_enabled:
await self._set_camera_images(frame.images)
elif isinstance(frame, TransportMessageFrame):
await self.send_message(frame)
else:
# If we get interrupted just clear the output buffer.
buffer = bytearray()
await self._internal_push_frame(frame)
if isinstance(frame, EndFrame):
future = asyncio.run_coroutine_threadsafe(self.stop(), self.get_event_loop())
future.result()
await self.stop()
self._sink_queue.task_done()
except queue.Empty:
pass
except asyncio.CancelledError:
break
except BaseException as e:
logger.error(f"{self} error processing sink queue: {e}")
@@ -209,8 +189,8 @@ class BaseOutputTransport(FrameProcessor):
def _create_push_task(self):
loop = self.get_event_loop()
self._push_frame_task = loop.create_task(self._push_frame_task_handler())
self._push_queue = asyncio.Queue()
self._push_frame_task = loop.create_task(self._push_frame_task_handler())
async def _internal_push_frame(
self,
@@ -233,7 +213,7 @@ class BaseOutputTransport(FrameProcessor):
async def send_image(self, frame: ImageRawFrame | SpriteFrame):
await self.process_frame(frame, FrameDirection.DOWNSTREAM)
def _draw_image(self, frame: ImageRawFrame):
async def _draw_image(self, frame: ImageRawFrame):
desired_size = (self._params.camera_out_width, self._params.camera_out_height)
if frame.size != desired_size:
@@ -243,32 +223,32 @@ class BaseOutputTransport(FrameProcessor):
f"{frame} does not have the expected size {desired_size}, resizing")
frame = ImageRawFrame(resized_image.tobytes(), resized_image.size, resized_image.format)
self.write_frame_to_camera(frame)
await self.write_frame_to_camera(frame)
def _set_camera_image(self, image: ImageRawFrame):
async def _set_camera_image(self, image: ImageRawFrame):
if self._params.camera_out_is_live:
self._camera_out_queue.put_nowait(image)
await self._camera_out_queue.put(image)
else:
self._camera_images = itertools.cycle([image])
def _set_camera_images(self, images: List[ImageRawFrame]):
async def _set_camera_images(self, images: List[ImageRawFrame]):
self._camera_images = itertools.cycle(images)
def _camera_out_thread_handler(self):
while self._running:
async def _camera_out_task_handler(self):
while True:
try:
if self._params.camera_out_is_live:
image = self._camera_out_queue.get(timeout=1)
self._draw_image(image)
image = await self._camera_out_queue.get()
await self._draw_image(image)
self._camera_out_queue.task_done()
elif self._camera_images:
image = next(self._camera_images)
self._draw_image(image)
time.sleep(1.0 / self._params.camera_out_framerate)
await self._draw_image(image)
await asyncio.sleep(1.0 / self._params.camera_out_framerate)
else:
time.sleep(1.0 / self._params.camera_out_framerate)
except queue.Empty:
pass
await asyncio.sleep(1.0 / self._params.camera_out_framerate)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"{self} error writing to camera: {e}")
@@ -279,12 +259,8 @@ class BaseOutputTransport(FrameProcessor):
async def send_audio(self, frame: AudioRawFrame):
await self.process_frame(frame, FrameDirection.DOWNSTREAM)
def _maybe_send_audio(self, buffer: bytearray) -> bytearray:
try:
if len(buffer) >= self._audio_chunk_size:
self.write_raw_audio_frames(bytes(buffer[:self._audio_chunk_size]))
buffer = buffer[self._audio_chunk_size:]
return buffer
except BaseException as e:
logger.error(f"{self} error writing audio frames: {e}")
return buffer
async def _maybe_send_audio(self, buffer: bytearray) -> bytearray:
if len(buffer) >= self._audio_chunk_size:
await self.write_raw_audio_frames(bytes(buffer[:self._audio_chunk_size]))
buffer = buffer[self._audio_chunk_size:]
return buffer

View File

@@ -6,6 +6,8 @@
import asyncio
from concurrent.futures import ThreadPoolExecutor
from pipecat.frames.frames import AudioRawFrame, StartFrame
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.transports.base_input import BaseInputTransport
@@ -43,26 +45,20 @@ class LocalAudioInputTransport(BaseInputTransport):
await super().start(frame)
self._in_stream.start_stream()
async def stop(self):
await super().stop()
self._in_stream.stop_stream()
async def cleanup(self):
await super().cleanup()
self._in_stream.stop_stream()
# This is not very pretty (taken from PyAudio docs).
while self._in_stream.is_active():
await asyncio.sleep(0.1)
self._in_stream.close()
await super().cleanup()
def _audio_in_callback(self, in_data, frame_count, time_info, status):
if not self._running:
return (None, pyaudio.paAbort)
frame = AudioRawFrame(audio=in_data,
sample_rate=self._params.audio_in_sample_rate,
num_channels=self._params.audio_in_channels)
self.push_audio_frame(frame)
asyncio.run_coroutine_threadsafe(self.push_audio_frame(frame), self.get_event_loop())
return (None, pyaudio.paContinue)
@@ -72,19 +68,29 @@ class LocalAudioOutputTransport(BaseOutputTransport):
def __init__(self, py_audio: pyaudio.PyAudio, params: TransportParams):
super().__init__(params)
self._executor = ThreadPoolExecutor(max_workers=5)
self._out_stream = py_audio.open(
format=py_audio.get_format_from_width(2),
channels=params.audio_out_channels,
rate=params.audio_out_sample_rate,
output=True)
def write_raw_audio_frames(self, frames: bytes):
self._out_stream.write(frames)
async def start(self, frame: StartFrame):
await super().start(frame)
self._out_stream.start_stream()
async def cleanup(self):
await super().cleanup()
self._out_stream.stop_stream()
# This is not very pretty (taken from PyAudio docs).
while self._out_stream.is_active():
await asyncio.sleep(0.1)
self._out_stream.close()
async def write_raw_audio_frames(self, frames: bytes):
await self.get_event_loop().run_in_executor(self._executor, self._out_stream.write, frames)
class LocalAudioTransport(BaseTransport):

View File

@@ -6,6 +6,8 @@
import asyncio
from concurrent.futures import ThreadPoolExecutor
import numpy as np
import tkinter as tk
@@ -53,25 +55,20 @@ class TkInputTransport(BaseInputTransport):
await super().start(frame)
self._in_stream.start_stream()
async def stop(self):
await super().stop()
self._in_stream.stop_stream()
async def cleanup(self):
await super().cleanup()
self._in_stream.stop_stream()
# This is not very pretty (taken from PyAudio docs).
while self._in_stream.is_active():
await asyncio.sleep(0.1)
self._in_stream.close()
def _audio_in_callback(self, in_data, frame_count, time_info, status):
if not self._running:
return (None, pyaudio.paAbort)
frame = AudioRawFrame(audio=in_data,
sample_rate=self._params.audio_in_sample_rate,
num_channels=self._params.audio_in_channels)
self.push_audio_frame(frame)
asyncio.run_coroutine_threadsafe(self.push_audio_frame(frame), self.get_event_loop())
return (None, pyaudio.paContinue)
@@ -81,6 +78,8 @@ class TkOutputTransport(BaseOutputTransport):
def __init__(self, tk_root: tk.Tk, py_audio: pyaudio.PyAudio, params: TransportParams):
super().__init__(params)
self._executor = ThreadPoolExecutor(max_workers=5)
self._out_stream = py_audio.open(
format=py_audio.get_format_from_width(2),
channels=params.audio_out_channels,
@@ -94,16 +93,24 @@ class TkOutputTransport(BaseOutputTransport):
self._image_label = tk.Label(tk_root, image=photo)
self._image_label.pack()
def write_raw_audio_frames(self, frames: bytes):
self._out_stream.write(frames)
def write_frame_to_camera(self, frame: ImageRawFrame):
self.get_event_loop().call_soon(self._write_frame_to_tk, frame)
async def start(self, frame: StartFrame):
await super().start(frame)
self._out_stream.start_stream()
async def cleanup(self):
await super().cleanup()
self._out_stream.stop_stream()
# This is not very pretty (taken from PyAudio docs).
while self._out_stream.is_active():
await asyncio.sleep(0.1)
self._out_stream.close()
async def write_raw_audio_frames(self, frames: bytes):
await self.get_event_loop().run_in_executor(self._executor, self._out_stream.write, frames)
async def write_frame_to_camera(self, frame: ImageRawFrame):
self.get_event_loop().call_soon(self._write_frame_to_tk, frame)
def _write_frame_to_tk(self, frame: ImageRawFrame):
width = frame.size[0]
height = frame.size[1]

View File

@@ -0,0 +1,160 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import io
import wave
from typing import Awaitable, Callable
from pydantic.main import BaseModel
from pipecat.serializers.twilio import TwilioFrameSerializer
from pipecat.frames.frames import AudioRawFrame, StartFrame
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.serializers.base_serializer import FrameSerializer
from pipecat.transports.base_input import BaseInputTransport
from pipecat.transports.base_output import BaseOutputTransport
from pipecat.transports.base_transport import BaseTransport, TransportParams
from loguru import logger
try:
from fastapi import WebSocket
from starlette.websockets import WebSocketState
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
"In order to use FastAPI websockets, you need to `pip install pipecat-ai[websocket]`.")
raise Exception(f"Missing module: {e}")
class FastAPIWebsocketParams(TransportParams):
add_wav_header: bool = False
audio_frame_size: int = 6400 # 200ms
serializer: FrameSerializer = TwilioFrameSerializer()
class FastAPIWebsocketCallbacks(BaseModel):
on_client_connected: Callable[[WebSocket], Awaitable[None]]
on_client_disconnected: Callable[[WebSocket], Awaitable[None]]
class FastAPIWebsocketInputTransport(BaseInputTransport):
def __init__(
self,
websocket: WebSocket,
params: FastAPIWebsocketParams,
callbacks: FastAPIWebsocketCallbacks,
**kwargs):
super().__init__(params, **kwargs)
self._websocket = websocket
self._params = params
self._callbacks = callbacks
async def start(self, frame: StartFrame):
await self._callbacks.on_client_connected(self._websocket)
await super().start(frame)
self._receive_task = self.get_event_loop().create_task(self._receive_messages())
async def stop(self):
if self._websocket.client_state != WebSocketState.DISCONNECTED:
await self._websocket.close()
await super().stop()
async def _receive_messages(self):
async for message in self._websocket.iter_text():
frame = self._params.serializer.deserialize(message)
if not frame:
continue
if isinstance(frame, AudioRawFrame):
await self.push_audio_frame(frame)
await self._callbacks.on_client_disconnected(self._websocket)
class FastAPIWebsocketOutputTransport(BaseOutputTransport):
def __init__(self, websocket: WebSocket, params: FastAPIWebsocketParams, **kwargs):
super().__init__(params, **kwargs)
self._websocket = websocket
self._params = params
self._audio_buffer = bytes()
async def write_raw_audio_frames(self, frames: bytes):
self._audio_buffer += frames
while len(self._audio_buffer) >= self._params.audio_frame_size:
frame = AudioRawFrame(
audio=self._audio_buffer[:self._params.audio_frame_size],
sample_rate=self._params.audio_out_sample_rate,
num_channels=self._params.audio_out_channels
)
if self._params.add_wav_header:
content = io.BytesIO()
ww = wave.open(content, "wb")
ww.setsampwidth(2)
ww.setnchannels(frame.num_channels)
ww.setframerate(frame.sample_rate)
ww.writeframes(frame.audio)
ww.close()
content.seek(0)
wav_frame = AudioRawFrame(
content.read(),
sample_rate=frame.sample_rate,
num_channels=frame.num_channels)
frame = wav_frame
payload = self._params.serializer.serialize(frame)
if payload:
await self._websocket.send_text(payload)
self._audio_buffer = self._audio_buffer[self._params.audio_frame_size:]
class FastAPIWebsocketTransport(BaseTransport):
def __init__(
self,
websocket: WebSocket,
params: FastAPIWebsocketParams = FastAPIWebsocketParams(),
input_name: str | None = None,
output_name: str | None = None,
loop: asyncio.AbstractEventLoop | None = None):
super().__init__(input_name=input_name, output_name=output_name, loop=loop)
self._params = params
self._callbacks = FastAPIWebsocketCallbacks(
on_client_connected=self._on_client_connected,
on_client_disconnected=self._on_client_disconnected
)
self._input = FastAPIWebsocketInputTransport(
websocket, self._params, self._callbacks, name=self._input_name)
self._output = FastAPIWebsocketOutputTransport(
websocket, self._params, name=self._output_name)
# Register supported handlers. The user will only be able to register
# these handlers.
self._register_event_handler("on_client_connected")
self._register_event_handler("on_client_disconnected")
def input(self) -> FrameProcessor:
return self._input
def output(self) -> FrameProcessor:
return self._output
async def _on_client_connected(self, websocket):
await self._call_event_handler("on_client_connected", websocket)
async def _on_client_disconnected(self, websocket):
await self._call_event_handler("on_client_disconnected", websocket)

View File

@@ -4,11 +4,9 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import io
import wave
import websockets
from typing import Awaitable, Callable
from pydantic.main import BaseModel
@@ -23,6 +21,13 @@ from pipecat.transports.base_transport import BaseTransport, TransportParams
from loguru import logger
try:
import websockets
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use websockets, you need to `pip install pipecat-ai[websocket]`.")
raise Exception(f"Missing module: {e}")
class WebsocketServerParams(TransportParams):
add_wav_header: bool = False
@@ -88,7 +93,7 @@ class WebsocketServerInputTransport(BaseInputTransport):
continue
if isinstance(frame, AudioRawFrame):
self.push_audio_frame(frame)
await self.push_audio_frame(frame)
else:
await self._internal_push_frame(frame)
@@ -118,7 +123,7 @@ class WebsocketServerOutputTransport(BaseOutputTransport):
logger.warning("Only one client allowed, using new connection")
self._websocket = websocket
def write_raw_audio_frames(self, frames: bytes):
async def write_raw_audio_frames(self, frames: bytes):
self._audio_buffer += frames
while len(self._audio_buffer) >= self._params.audio_frame_size:
frame = AudioRawFrame(
@@ -144,9 +149,7 @@ class WebsocketServerOutputTransport(BaseOutputTransport):
proto = self._params.serializer.serialize(frame)
future = asyncio.run_coroutine_threadsafe(
self._websocket.send(proto), self.get_event_loop())
future.result()
await self._websocket.send(proto)
self._audio_buffer = self._audio_buffer[self._params.audio_frame_size:]

View File

@@ -6,11 +6,10 @@
import aiohttp
import asyncio
import queue
import time
from dataclasses import dataclass
from typing import Any, Callable, Mapping
from typing import Any, Awaitable, Callable, Mapping
from concurrent.futures import ThreadPoolExecutor
from daily import (
@@ -108,19 +107,27 @@ class DailyParams(TransportParams):
class DailyCallbacks(BaseModel):
on_joined: Callable[[Mapping[str, Any]], None]
on_left: Callable[[], None]
on_error: Callable[[str], None]
on_app_message: Callable[[Any, str], None]
on_call_state_updated: Callable[[str], None]
on_dialin_ready: Callable[[str], None]
on_dialout_connected: Callable[[Any], None]
on_dialout_stopped: Callable[[Any], None]
on_dialout_error: Callable[[Any], None]
on_dialout_warning: Callable[[Any], None]
on_first_participant_joined: Callable[[Mapping[str, Any]], None]
on_participant_joined: Callable[[Mapping[str, Any]], None]
on_participant_left: Callable[[Mapping[str, Any], str], None]
on_joined: Callable[[Mapping[str, Any]], Awaitable[None]]
on_left: Callable[[], Awaitable[None]]
on_error: Callable[[str], Awaitable[None]]
on_app_message: Callable[[Any, str], Awaitable[None]]
on_call_state_updated: Callable[[str], Awaitable[None]]
on_dialin_ready: Callable[[str], Awaitable[None]]
on_dialout_answered: Callable[[Any], Awaitable[None]]
on_dialout_connected: Callable[[Any], Awaitable[None]]
on_dialout_stopped: Callable[[Any], Awaitable[None]]
on_dialout_error: Callable[[Any], Awaitable[None]]
on_dialout_warning: Callable[[Any], Awaitable[None]]
on_first_participant_joined: Callable[[Mapping[str, Any]], Awaitable[None]]
on_participant_joined: Callable[[Mapping[str, Any]], Awaitable[None]]
on_participant_left: Callable[[Mapping[str, Any], str], Awaitable[None]]
def completion_callback(future):
def _callback(*args):
if not future.cancelled():
future.get_loop().call_soon_threadsafe(future.set_result, *args)
return _callback
class DailyTransportClient(EventHandler):
@@ -160,7 +167,6 @@ class DailyTransportClient(EventHandler):
self._joined = False
self._joining = False
self._leaving = False
self._sync_response = {k: queue.Queue() for k in ["join", "leave"]}
self._executor = ThreadPoolExecutor(max_workers=5)
@@ -173,10 +179,16 @@ class DailyTransportClient(EventHandler):
color_format=self._params.camera_out_color_format)
self._mic: VirtualMicrophoneDevice = Daily.create_microphone_device(
"mic", sample_rate=self._params.audio_out_sample_rate, channels=self._params.audio_out_channels)
"mic",
sample_rate=self._params.audio_out_sample_rate,
channels=self._params.audio_out_channels,
non_blocking=True)
self._speaker: VirtualSpeakerDevice = Daily.create_speaker_device(
"speaker", sample_rate=self._params.audio_in_sample_rate, channels=self._params.audio_in_channels)
"speaker",
sample_rate=self._params.audio_in_sample_rate,
channels=self._params.audio_in_channels,
non_blocking=True)
Daily.select_speaker_device("speaker")
@property
@@ -186,30 +198,39 @@ class DailyTransportClient(EventHandler):
def set_callbacks(self, callbacks: DailyCallbacks):
self._callbacks = callbacks
def send_message(self, frame: DailyTransportMessageFrame):
self._client.send_app_message(frame.message, frame.participant_id)
async def send_message(self, frame: DailyTransportMessageFrame):
future = self._loop.create_future()
self._client.send_app_message(
frame.message,
frame.participant_id,
completion=completion_callback(future))
await future
def read_next_audio_frame(self) -> AudioRawFrame | None:
async def read_next_audio_frame(self) -> AudioRawFrame | None:
sample_rate = self._params.audio_in_sample_rate
num_channels = self._params.audio_in_channels
if self._other_participant_has_joined:
num_frames = int(sample_rate / 100) * 2 # 20ms of audio
audio = self._speaker.read_frames(num_frames)
future = self._loop.create_future()
self._speaker.read_frames(num_frames, completion=completion_callback(future))
audio = await future
return AudioRawFrame(audio=audio, sample_rate=sample_rate, num_channels=num_channels)
else:
# If no one has ever joined the meeting `read_frames()` would block,
# instead we just wait a bit. daily-python should probably return
# silence instead.
time.sleep(0.01)
await asyncio.sleep(0.01)
return None
def write_raw_audio_frames(self, frames: bytes):
self._mic.write_frames(frames)
async def write_raw_audio_frames(self, frames: bytes):
future = self._loop.create_future()
self._mic.write_frames(frames, completion=completion_callback(future))
await future
def write_frame_to_camera(self, frame: ImageRawFrame):
async def write_frame_to_camera(self, frame: ImageRawFrame):
self._camera.write_frame(frame.image)
async def join(self):
@@ -217,13 +238,10 @@ class DailyTransportClient(EventHandler):
if self._joined or self._joining:
return
self._joining = True
await self._loop.run_in_executor(self._executor, self._join)
def _join(self):
logger.info(f"Joining {self._room_url}")
self._joining = True
# For performance reasons, never subscribe to video streams (unless a
# video renderer is registered).
self._client.update_subscription_profiles({
@@ -235,10 +253,42 @@ class DailyTransportClient(EventHandler):
self._client.set_user_name(self._bot_name)
try:
(data, error) = await self._join()
if not error:
self._joined = True
self._joining = False
logger.info(f"Joined {self._room_url}")
if self._token and self._params.transcription_enabled:
logger.info(
f"Enabling transcription with settings {self._params.transcription_settings}")
self._client.start_transcription(
self._params.transcription_settings.model_dump())
await self._callbacks.on_joined(data["participants"]["local"])
else:
error_msg = f"Error joining {self._room_url}: {error}"
logger.error(error_msg)
await self._callbacks.on_error(error_msg)
except asyncio.TimeoutError:
error_msg = f"Time out joining {self._room_url}"
logger.error(error_msg)
await self._callbacks.on_error(error_msg)
async def _join(self):
future = self._loop.create_future()
def handle_join_response(data, error):
if not future.cancelled():
future.get_loop().call_soon_threadsafe(future.set_result, (data, error))
self._client.join(
self._room_url,
self._token,
completion=self._call_joined,
completion=handle_join_response,
client_settings={
"inputs": {
"camera": {
@@ -274,33 +324,7 @@ class DailyTransportClient(EventHandler):
},
})
self._handle_join_response()
def _handle_join_response(self):
try:
(data, error) = self._sync_response["join"].get(timeout=10)
if not error:
self._joined = True
self._joining = False
logger.info(f"Joined {self._room_url}")
if self._token and self._params.transcription_enabled:
logger.info(
f"Enabling transcription with settings {self._params.transcription_settings}")
self._client.start_transcription(
self._params.transcription_settings.model_dump())
self._callbacks.on_joined(data["participants"]["local"])
else:
error_msg = f"Error joining {self._room_url}: {error}"
logger.error(error_msg)
self._callbacks.on_error(error_msg)
self._sync_response["join"].task_done()
except queue.Empty:
error_msg = f"Time out joining {self._room_url}"
logger.error(error_msg)
self._callbacks.on_error(error_msg)
return await asyncio.wait_for(future, timeout=10)
async def leave(self):
# Transport not joined, ignore.
@@ -310,34 +334,36 @@ class DailyTransportClient(EventHandler):
self._joined = False
self._leaving = True
await self._loop.run_in_executor(self._executor, self._leave)
def _leave(self):
logger.info(f"Leaving {self._room_url}")
if self._params.transcription_enabled:
self._client.stop_transcription()
self._client.leave(completion=self._call_left)
self._handle_leave_response()
def _handle_leave_response(self):
try:
error = self._sync_response["leave"].get(timeout=10)
error = await self._leave()
if not error:
self._leaving = False
logger.info(f"Left {self._room_url}")
self._callbacks.on_left()
await self._callbacks.on_left()
else:
error_msg = f"Error leaving {self._room_url}: {error}"
logger.error(error_msg)
self._callbacks.on_error(error_msg)
self._sync_response["leave"].task_done()
except queue.Empty:
await self._callbacks.on_error(error_msg)
except asyncio.TimeoutError:
error_msg = f"Time out leaving {self._room_url}"
logger.error(error_msg)
self._callbacks.on_error(error_msg)
await self._callbacks.on_error(error_msg)
async def _leave(self):
future = self._loop.create_future()
def handle_leave_response(error):
if not future.cancelled():
future.get_loop().call_soon_threadsafe(future.set_result, error)
self._client.leave(completion=handle_leave_response)
return await asyncio.wait_for(future, timeout=10)
async def cleanup(self):
await self._loop.run_in_executor(self._executor, self._cleanup)
@@ -399,25 +425,28 @@ class DailyTransportClient(EventHandler):
#
def on_app_message(self, message: Any, sender: str):
self._callbacks.on_app_message(message, sender)
self._call_async_callback(self._callbacks.on_app_message, message, sender)
def on_call_state_updated(self, state: str):
self._callbacks.on_call_state_updated(state)
self._call_async_callback(self._callbacks.on_call_state_updated, state)
def on_dialin_ready(self, sip_endpoint: str):
self._callbacks.on_dialin_ready(sip_endpoint)
self._call_async_callback(self._callbacks.on_dialin_ready, sip_endpoint)
def on_dialout_answered(self, data: Any):
self._call_async_callback(self._callbacks.on_dialout_answered, data)
def on_dialout_connected(self, data: Any):
self._callbacks.on_dialout_connected(data)
self._call_async_callback(self._callbacks.on_dialout_connected, data)
def on_dialout_stopped(self, data: Any):
self._callbacks.on_dialout_stopped(data)
self._call_async_callback(self._callbacks.on_dialout_stopped, data)
def on_dialout_error(self, data: Any):
self._callbacks.on_dialout_error(data)
self._call_async_callback(self._callbacks.on_dialout_error, data)
def on_dialout_warning(self, data: Any):
self._callbacks.on_dialout_warning(data)
self._call_async_callback(self._callbacks.on_dialout_warning, data)
def on_participant_joined(self, participant):
id = participant["id"]
@@ -425,15 +454,15 @@ class DailyTransportClient(EventHandler):
if not self._other_participant_has_joined:
self._other_participant_has_joined = True
self._callbacks.on_first_participant_joined(participant)
self._call_async_callback(self._callbacks.on_first_participant_joined, participant)
self._callbacks.on_participant_joined(participant)
self._call_async_callback(self._callbacks.on_participant_joined, participant)
def on_participant_left(self, participant, reason):
id = participant["id"]
logger.info(f"Participant left {id}")
self._callbacks.on_participant_left(participant, reason)
self._call_async_callback(self._callbacks.on_participant_left, participant, reason)
def on_transcription_message(self, message: Mapping[str, Any]):
participant_id = ""
@@ -442,7 +471,7 @@ class DailyTransportClient(EventHandler):
if participant_id in self._transcription_renderers:
callback = self._transcription_renderers[participant_id]
callback(participant_id, message)
self._call_async_callback(callback, participant_id, message)
def on_transcription_error(self, message):
logger.error(f"Transcription error: {message}")
@@ -457,18 +486,19 @@ class DailyTransportClient(EventHandler):
# Daily (CallClient callbacks)
#
def _call_joined(self, data, error):
self._sync_response["join"].put((data, error))
def _call_left(self, error):
self._sync_response["leave"].put(error)
def _video_frame_received(self, participant_id, video_frame):
callback = self._video_renderers[participant_id]
callback(participant_id,
video_frame.buffer,
(video_frame.width, video_frame.height),
video_frame.color_format)
self._call_async_callback(
callback,
participant_id,
video_frame.buffer,
(video_frame.width,
video_frame.height),
video_frame.color_format)
def _call_async_callback(self, callback, *args):
future = asyncio.run_coroutine_threadsafe(callback(*args), self._loop)
future.result()
class DailyInputTransport(BaseInputTransport):
@@ -487,8 +517,6 @@ class DailyInputTransport(BaseInputTransport):
num_channels=self._params.audio_in_channels)
async def start(self, frame: StartFrame):
if self._running:
return
# Parent start.
await super().start(frame)
# Join the room.
@@ -496,19 +524,17 @@ class DailyInputTransport(BaseInputTransport):
# Create audio task. It reads audio frames from Daily and push them
# internally for VAD processing.
if self._params.audio_in_enabled or self._params.vad_enabled:
self._audio_in_thread = self._loop.run_in_executor(
self._executor, self._audio_in_thread_handler)
self._audio_in_task = self.get_event_loop().create_task(self._audio_in_task_handler())
async def stop(self):
if not self._running:
return
# Parent stop. This will set _running to False.
# Parent stop.
await super().stop()
# Leave the room.
await self._client.leave()
# Stop audio thread.
if self._params.audio_in_enabled or self._params.vad_enabled:
await self._audio_in_thread
self._audio_in_task.cancel()
await self._audio_in_task
async def cleanup(self):
await super().cleanup()
@@ -531,26 +557,25 @@ class DailyInputTransport(BaseInputTransport):
# Frames
#
def push_transcription_frame(self, frame: TranscriptionFrame | InterimTranscriptionFrame):
future = asyncio.run_coroutine_threadsafe(
self._internal_push_frame(frame), self.get_event_loop())
future.result()
async def push_transcription_frame(self, frame: TranscriptionFrame | InterimTranscriptionFrame):
await self._internal_push_frame(frame)
def push_app_message(self, message: Any, sender: str):
async def push_app_message(self, message: Any, sender: str):
frame = DailyTransportMessageFrame(message=message, participant_id=sender)
future = asyncio.run_coroutine_threadsafe(
self._internal_push_frame(frame), self.get_event_loop())
future.result()
await self._internal_push_frame(frame)
#
# Audio in
#
def _audio_in_thread_handler(self):
while self._running:
frame = self._client.read_next_audio_frame()
if frame:
self.push_audio_frame(frame)
async def _audio_in_task_handler(self):
while True:
try:
frame = await self._client.read_next_audio_frame()
if frame:
await self.push_audio_frame(frame)
except asyncio.CancelledError:
break
#
# Camera in
@@ -580,7 +605,7 @@ class DailyInputTransport(BaseInputTransport):
if participant_id in self._video_renderers:
self._video_renderers[participant_id]["render_next_frame"] = True
def _on_participant_video_frame(self, participant_id: str, buffer, size, format):
async def _on_participant_video_frame(self, participant_id: str, buffer, size, format):
render_frame = False
curr_time = time.time()
@@ -600,9 +625,7 @@ class DailyInputTransport(BaseInputTransport):
image=buffer,
size=size,
format=format)
future = asyncio.run_coroutine_threadsafe(
self._internal_push_frame(frame), self.get_event_loop())
future.result()
await self._internal_push_frame(frame)
self._video_renderers[participant_id]["timestamp"] = curr_time
@@ -615,17 +638,13 @@ class DailyOutputTransport(BaseOutputTransport):
self._client = client
async def start(self, frame: StartFrame):
if self._running:
return
# Parent start.
await super().start(frame)
# Join the room.
await self._client.join()
async def stop(self):
if not self._running:
return
# Parent stop. This will set _running to False.
# Parent stop.
await super().stop()
# Leave the room.
await self._client.leave()
@@ -634,10 +653,10 @@ class DailyOutputTransport(BaseOutputTransport):
await super().cleanup()
await self._client.cleanup()
def send_message(self, frame: DailyTransportMessageFrame):
self._client.send_message(frame)
async def send_message(self, frame: DailyTransportMessageFrame):
await self._client.send_message(frame)
def send_metrics(self, frame: MetricsFrame):
async def send_metrics(self, frame: MetricsFrame):
ttfb = [{"name": n, "time": t} for n, t in frame.ttfb.items()]
message = DailyTransportMessageFrame(message={
"type": "pipecat-metrics",
@@ -645,13 +664,13 @@ class DailyOutputTransport(BaseOutputTransport):
"ttfb": ttfb
},
})
self._client.send_message(message)
await self._client.send_message(message)
def write_raw_audio_frames(self, frames: bytes):
self._client.write_raw_audio_frames(frames)
async def write_raw_audio_frames(self, frames: bytes):
await self._client.write_raw_audio_frames(frames)
def write_frame_to_camera(self, frame: ImageRawFrame):
self._client.write_frame_to_camera(frame)
async def write_frame_to_camera(self, frame: ImageRawFrame):
await self._client.write_frame_to_camera(frame)
class DailyTransport(BaseTransport):
@@ -674,6 +693,7 @@ class DailyTransport(BaseTransport):
on_app_message=self._on_app_message,
on_call_state_updated=self._on_call_state_updated,
on_dialin_ready=self._on_dialin_ready,
on_dialout_answered=self._on_dialout_answered,
on_dialout_connected=self._on_dialout_connected,
on_dialout_stopped=self._on_dialout_stopped,
on_dialout_error=self._on_dialout_error,
@@ -696,6 +716,7 @@ class DailyTransport(BaseTransport):
self._register_event_handler("on_app_message")
self._register_event_handler("on_call_state_updated")
self._register_event_handler("on_dialin_ready")
self._register_event_handler("on_dialout_answered")
self._register_event_handler("on_dialout_connected")
self._register_event_handler("on_dialout_stopped")
self._register_event_handler("on_dialout_error")
@@ -768,24 +789,24 @@ class DailyTransport(BaseTransport):
self._input.capture_participant_video(
participant_id, framerate, video_source, color_format)
def _on_joined(self, participant):
self._call_async_event_handler("on_joined", participant)
async def _on_joined(self, participant):
await self._call_event_handler("on_joined", participant)
def _on_left(self):
self._call_async_event_handler("on_left")
async def _on_left(self):
await self._call_event_handler("on_left")
def _on_error(self, error):
async def _on_error(self, error):
# TODO(aleix): Report error to input/output transports. The one managing
# the client should report the error.
pass
def _on_app_message(self, message: Any, sender: str):
async def _on_app_message(self, message: Any, sender: str):
if self._input:
self._input.push_app_message(message, sender)
self._call_async_event_handler("on_app_message", message, sender)
await self._input.push_app_message(message, sender)
await self._call_event_handler("on_app_message", message, sender)
def _on_call_state_updated(self, state: str):
self._call_async_event_handler("on_call_state_updated", state)
async def _on_call_state_updated(self, state: str):
await self._call_event_handler("on_call_state_updated", state)
async def _handle_dialin_ready(self, sip_endpoint: str):
if not self._params.dialin_settings:
@@ -818,33 +839,36 @@ class DailyTransport(BaseTransport):
except BaseException as e:
logger.error(f"Error handling dialin-ready event ({url}): {e}")
def _on_dialin_ready(self, sip_endpoint):
async def _on_dialin_ready(self, sip_endpoint):
if self._params.dialin_settings:
asyncio.run_coroutine_threadsafe(self._handle_dialin_ready(sip_endpoint), self._loop)
self._call_async_event_handler("on_dialin_ready", sip_endpoint)
await self._handle_dialin_ready(sip_endpoint)
await self._call_event_handler("on_dialin_ready", sip_endpoint)
def _on_dialout_connected(self, data):
self._call_async_event_handler("on_dialout_connected", data)
async def _on_dialout_answered(self, data):
await self._call_event_handler("on_dialout_answered", data)
def _on_dialout_stopped(self, data):
self._call_async_event_handler("on_dialout_stopped", data)
async def _on_dialout_connected(self, data):
await self._call_event_handler("on_dialout_connected", data)
def _on_dialout_error(self, data):
self._call_async_event_handler("on_dialout_error", data)
async def _on_dialout_stopped(self, data):
await self._call_event_handler("on_dialout_stopped", data)
def _on_dialout_warning(self, data):
self._call_async_event_handler("on_dialout_warning", data)
async def _on_dialout_error(self, data):
await self._call_event_handler("on_dialout_error", data)
def _on_participant_joined(self, participant):
self._call_async_event_handler("on_participant_joined", participant)
async def _on_dialout_warning(self, data):
await self._call_event_handler("on_dialout_warning", data)
def _on_participant_left(self, participant, reason):
self._call_async_event_handler("on_participant_left", participant, reason)
async def _on_participant_joined(self, participant):
await self._call_event_handler("on_participant_joined", participant)
def _on_first_participant_joined(self, participant):
self._call_async_event_handler("on_first_participant_joined", participant)
async def _on_participant_left(self, participant, reason):
await self._call_event_handler("on_participant_left", participant, reason)
def _on_transcription_message(self, participant_id, message):
async def _on_first_participant_joined(self, participant):
await self._call_event_handler("on_first_participant_joined", participant)
async def _on_transcription_message(self, participant_id, message):
text = message["text"]
timestamp = message["timestamp"]
is_final = message["rawResponse"]["is_final"]
@@ -855,9 +879,4 @@ class DailyTransport(BaseTransport):
frame = InterimTranscriptionFrame(text, participant_id, timestamp)
if self._input:
self._input.push_transcription_frame(frame)
def _call_async_event_handler(self, event_name: str, *args, **kwargs):
future = asyncio.run_coroutine_threadsafe(
self._call_event_handler(event_name, *args, **kwargs), self._loop)
future.result()
await self._input.push_transcription_frame(frame)

View File

@@ -4,6 +4,7 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
import audioop
import numpy as np
import pyloudnorm as pyln
@@ -31,3 +32,23 @@ def calculate_audio_volume(audio: bytes, sample_rate: int) -> float:
def exp_smoothing(value: float, prev_value: float, factor: float) -> float:
return prev_value + factor * (value - prev_value)
def ulaw_8000_to_pcm_16000(ulaw_8000_bytes):
# Convert μ-law to PCM
pcm_8000_bytes = audioop.ulaw2lin(ulaw_8000_bytes, 2)
# Resample from 8000 Hz to 16000 Hz
pcm_16000_bytes = audioop.ratecv(pcm_8000_bytes, 2, 1, 8000, 16000, None)[0]
return pcm_16000_bytes
def pcm_16000_to_ulaw_8000(pcm_16000_bytes):
# Resample from 16000 Hz to 8000 Hz
pcm_8000_bytes = audioop.ratecv(pcm_16000_bytes, 2, 1, 16000, 8000, None)[0]
# Convert PCM to μ-law
ulaw_8000_bytes = audioop.lin2ulaw(pcm_8000_bytes, 2)
return ulaw_8000_bytes