Compare commits

...

67 Commits

Author SHA1 Message Date
Aleix Conchillo Flaqué
065cfb2aca Merge pull request #280 from pipecat-ai/aleix/library-updates-070224
library updates 070224 and pipecat 0.0.36
2024-07-02 10:14:03 -07:00
Aleix Conchillo Flaqué
3147534e86 update CHANGELOG for 0.0.36 2024-07-02 10:13:26 -07:00
Aleix Conchillo Flaqué
be5603bf16 examples: fix 06a-image-sync.py 2024-07-02 10:11:50 -07:00
Aleix Conchillo Flaqué
b9b0bcdcbd services(azure): close the audio stream on exit 2024-07-02 10:11:35 -07:00
Aleix Conchillo Flaqué
5bcece56f3 services(cartesia): make sure we close the client on exit 2024-07-02 10:11:16 -07:00
Aleix Conchillo Flaqué
d67faef88c pyproject: multiple library updates 2024-07-02 09:05:37 -07:00
Aleix Conchillo Flaqué
8f6db5e905 Merge pull request #279 from pipecat-ai/aleix/gladia-stt-support
add Gladia STT support
2024-07-02 08:07:35 -07:00
Aleix Conchillo Flaqué
82e93a0560 use exclude_none=True when dumping BaseModels 2024-07-02 08:03:31 -07:00
Aleix Conchillo Flaqué
a9a82c083b services: add GladiaSTTService support 2024-07-02 08:03:29 -07:00
Aleix Conchillo Flaqué
974d9c33ed Merge pull request #278 from pipecat-ai/aleix/detect-user-idle
add support for detecting user idle
2024-07-02 08:01:27 -07:00
Jon Taylor
c1957ab694 Merge pull request #274 from pipecat-ai/jpt/deployment-examples
Example deployment pattern for fly.io
2024-07-02 10:17:13 +01:00
Jon Taylor
b20a10a4bc fixed double fly 2024-07-02 10:17:01 +01:00
Aleix Conchillo Flaqué
be14ce465d transports(daily): make sure we don't send data if client is closed 2024-07-01 18:26:13 -07:00
Aleix Conchillo Flaqué
d1ca0c5614 examples: added new 17-detect-user-idle.py 2024-07-01 18:17:43 -07:00
Aleix Conchillo Flaqué
535514f506 processors: added new UserIdleProcessor 2024-07-01 18:17:43 -07:00
Aleix Conchillo Flaqué
933b63cf13 processors: added new IdleFrameProcessor 2024-07-01 14:57:42 -07:00
Aleix Conchillo Flaqué
d7c3e380a5 added BotSpeakingFrame 2024-07-01 14:57:18 -07:00
Aleix Conchillo Flaqué
c5298f78cb add more missing keyword-only arguments 2024-07-01 12:34:53 -07:00
Jon Taylor
4f8f7b8d1d added on_call_state event to prevent idle vms 2024-07-01 19:21:16 +01:00
Aleix Conchillo Flaqué
d7d46919ac update macos-py3.10-requirements.txt 2024-07-01 11:00:59 -07:00
Aleix Conchillo Flaqué
e5d73d2e2e update linux-py3.10-requirements.txt 2024-07-01 10:58:49 -07:00
Aleix Conchillo Flaqué
b145e8ec90 update README with XTTS 2024-07-01 10:49:43 -07:00
Aleix Conchillo Flaqué
97ff4a1fb8 Merge pull request #275 from pipecat-ai/aleix/add-missing-keyword-separators
add missing keyword separators
2024-07-01 10:45:31 -07:00
Aleix Conchillo Flaqué
5018a552c1 services(xtts): no need the WAV header 2024-07-01 10:44:32 -07:00
Aleix Conchillo Flaqué
7f9fd9ffce examples: added 07i-interruptible-xtts 2024-07-01 10:41:34 -07:00
Aleix Conchillo Flaqué
ddd0ca6a8f update CHANGELOG 2024-07-01 10:27:26 -07:00
Aleix Conchillo Flaqué
06f817c7e3 transport(websocket): don't send if serializer returns None 2024-07-01 10:27:26 -07:00
Aleix Conchillo Flaqué
df4c3e56c4 services: add missing * keyword separator 2024-07-01 10:27:26 -07:00
Aleix Conchillo Flaqué
9d5c2b9656 Merge pull request #276 from eddieoz/feature/xtts
Added service XTTS
2024-07-01 10:26:53 -07:00
eddieoz
7ce59c5e2e added service xtts 2024-07-01 20:17:19 +03:00
Aleix Conchillo Flaqué
1c9631fc78 Merge pull request #271 from pipecat-ai/aleix/silero-vad-version
vad(silero): allow specifying a Silero VAD version
2024-07-01 09:39:59 -07:00
Aleix Conchillo Flaqué
efbe7297f7 vad(silero): allow specifying a Silero VAD version 2024-07-01 09:38:43 -07:00
Aleix Conchillo Flaqué
1b45946a61 Merge pull request #270 from pipecat-ai/aleix/async-frame-processor
add new AsyncFrameProcessor and AsyncAIService
2024-07-01 09:37:51 -07:00
Aleix Conchillo Flaqué
cbf5a6362c add new AsyncFrameProcessor and AsyncAIService 2024-07-01 09:37:02 -07:00
Aleix Conchillo Flaqué
583b96c341 Merge pull request #269 from pipecat-ai/aleix/improve-error-handling
improve error handling and don't swallow exceptions
2024-07-01 09:36:00 -07:00
Aleix Conchillo Flaqué
fc0920504d improve error handling and don't swallow exceptions 2024-07-01 09:35:45 -07:00
Aleix Conchillo Flaqué
abd65a93b2 Merge pull request #268 from pipecat-ai/aleix/websocket-dont-send-if-closed
transports(websocket): don't send data if websocket closed
2024-07-01 09:33:45 -07:00
Aleix Conchillo Flaqué
c3244fdd7a transports(websocket): don't send data if websocket closed 2024-07-01 09:31:58 -07:00
Aleix Conchillo Flaqué
e8f58938b0 Merge pull request #267 from pipecat-ai/aleix/processing-metrics
add support for processing metrics
2024-07-01 09:31:05 -07:00
Jon Taylor
602b4f34b1 added example fly.toml 2024-07-01 16:50:53 +01:00
Jon Taylor
0399c84dfa added flyio deployment example 2024-07-01 16:46:38 +01:00
Aleix Conchillo Flaqué
fd5d879bf5 add support for processing metrics
Processing metrics indicate how much time a processor takes to generate all of
its output.
2024-06-28 14:26:57 -07:00
Aleix Conchillo Flaqué
8dff460307 Merge pull request #266 from pipecat-ai/aleix/silero-num-frames-fixes
vad: fix Silero VAD required number of frames
2024-06-28 11:25:55 -07:00
Aleix Conchillo Flaqué
cce1ddb183 vad: fix Silero VAD required number of frames 2024-06-28 10:45:48 -07:00
Aleix Conchillo Flaqué
8691d14289 Merge pull request #255 from Viking5274/main
Fix twilio error
2024-06-26 10:17:03 -07:00
daniil5701133
dd402da9e5 added handling streamSid after first wss connect
fixx name
2024-06-26 18:56:30 +03:00
Aleix Conchillo Flaqué
2fd04248f1 examples(storytelling-chatbot): upgrade npm vulnerabilities 2024-06-25 22:04:55 -07:00
Aleix Conchillo Flaqué
0ac42006f8 Merge pull request #260 from pipecat-ai/aleix/more-interruption-fixes
more interruption fixes
2024-06-25 21:52:02 -07:00
Aleix Conchillo Flaqué
66e331248d update CHANGELOG for 0.0.34 2024-06-25 21:43:23 -07:00
Aleix Conchillo Flaqué
4be3e8c87d aggregators: revert using intermediate results 2024-06-25 21:33:17 -07:00
Aleix Conchillo Flaqué
dac033fe61 services(azure): allow transcriptions during interruptions
If the user interrupts we can't just discard transcriptions because the user is
actually interrupting and talking.
2024-06-25 21:33:06 -07:00
Aleix Conchillo Flaqué
d302cbb114 services(deepgram): allow transcriptions during interruptions
If the user interrupts we can't just discard transcriptions because the user is
actually interrupting and talking.
2024-06-25 21:32:21 -07:00
Aleix Conchillo Flaqué
e3b407db28 Merge pull request #259 from pipecat-ai/aleix/prepare-0.0.33
update CHANGELOG for 0.0.33
2024-06-25 12:05:07 -07:00
Aleix Conchillo Flaqué
4ef623f09e update CHANGELOG for 0.0.33 2024-06-25 11:53:07 -07:00
Aleix Conchillo Flaqué
253530a63d Merge pull request #258 from pipecat-ai/aleix/upgrade-cartesia-1.0.0
services(cartesia): upgrade to new cartesia 1.0.0
2024-06-25 11:52:04 -07:00
Aleix Conchillo Flaqué
4f38d989f5 services(cartesia): upgrade to new cartesia 1.0.0 2024-06-25 11:51:34 -07:00
Aleix Conchillo Flaqué
84074e90ee Merge pull request #257 from pipecat-ai/aleix/cancel-all-tasks-when-interrutpted
cancel all tasks when interrutpted
2024-06-25 11:16:00 -07:00
Aleix Conchillo Flaqué
38aee7d8f2 services(azure): cancel tasks when interrupted and ignore incoming transcriptions 2024-06-25 11:15:26 -07:00
Aleix Conchillo Flaqué
64198313c6 services(deepgram): cancel tasks when interrupted and ignore incoming transcriptions 2024-06-25 11:15:07 -07:00
Aleix Conchillo Flaqué
d61b6c301c transports(base_input): create push tasks after pushing interruption 2024-06-25 11:15:07 -07:00
Aleix Conchillo Flaqué
83d1931266 Merge pull request #256 from pipecat-ai/aleix/tts-cleanup-when-interrupted
services(tts): strip before TTS and cleanup when interrupted
2024-06-25 11:14:32 -07:00
Aleix Conchillo Flaqué
c31f2ab285 services(tts): strip before TTS and cleanup when interrupted 2024-06-25 11:13:19 -07:00
Aleix Conchillo Flaqué
0ddc5721b4 Merge pull request #252 from pipecat-ai/aleix/daily-check-size-read-audio-frames
transports(daily): always check size of read audio frames
2024-06-25 09:45:05 -07:00
Aleix Conchillo Flaqué
98bd183bc4 pyproject: fix cartesia version and update requirements files 2024-06-25 09:43:54 -07:00
Aleix Conchillo Flaqué
aaa154524c Merge pull request #253 from pipecat-ai/aleix/llm-response-use-intermediate-results
aggregators: uses intermediate results for LLMAssistantResponseAggreg…
2024-06-24 19:21:14 -07:00
Aleix Conchillo Flaqué
beced68337 aggregators: uses intermediate results for LLMAssistantResponseAggregator 2024-06-24 17:33:45 -07:00
Aleix Conchillo Flaqué
94823ab952 transports(daily): always check size of read audio frames 2024-06-24 14:56:24 -07:00
59 changed files with 1705 additions and 298 deletions

View File

@@ -5,6 +5,111 @@ All notable changes to **pipecat** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [0.0.36] - 2024-07-02
### Added
- Added `GladiaSTTService`.
See https://docs.gladia.io/chapters/speech-to-text-api/pages/live-speech-recognition
- Added `XTTSService`. This is a local Text-To-Speech service.
See https://github.com/coqui-ai/TTS
- Added `UserIdleProcessor`. This processor can be used to wait for any
interaction with the user. If the user doesn't say anything within a given
timeout a provided callback is called.
- Added `IdleFrameProcessor`. This processor can be used to wait for frames
within a given timeout. If no frame is received within the timeout a provided
callback is called.
- Added new frame `BotSpeakingFrame`. This frame will be continuously pushed
upstream while the bot is talking.
- It is now possible to specify a Silero VAD version when using `SileroVADAnalyzer`
or `SileroVAD`.
- Added `AysncFrameProcessor` and `AsyncAIService`. Some services like
`DeepgramSTTService` need to process things asynchronously. For example, audio
is sent to Deepgram but transcriptions are not returned immediately. In these
cases we still require all frames (except system frames) to be pushed
downstream from a single task. That's what `AsyncFrameProcessor` is for. It
creates a task and all frames should be pushed from that task. So, whenever a
new Deepgram transcription is ready that transcription will also be pushed
from this internal task.
- The `MetricsFrame` now includes processing metrics if metrics are enabled. The
processing metrics indicate the time a processor needs to generate all its
output. Note that not all processors generate these kind of metrics.
### Changed
- `WhisperSTTService` model can now also be a string.
- Added missing * keyword separators in services.
### Fixed
- `WebsocketServerTransport` doesn't try to send frames anymore if serializers
returns `None`.
- Fixed an issue where exceptions that occurred inside frame processors were
being swallowed and not displayed.
- Fixed an issue in `FastAPIWebsocketTransport` where it would still try to send
data to the websocket after being closed.
### Other
- Added Fly.io deployment example in `examples/deployment/flyio-example`.
- Added new `17-detect-user-idle.py` example that shows how to use the new
`UserIdleProcessor`.
## [0.0.35] - 2024-06-28
### Changed
- `FastAPIWebsocketParams` now require a serializer.
- `TwilioFrameSerializer` now requires a `streamSid`.
### Fixed
- Silero VAD number of frames needs to be 512 for 16000 sample rate or 256 for
8000 sample rate.
## [0.0.34] - 2024-06-25
### Fixed
- Fixed an issue with asynchronous STT services (Deepgram and Azure) that could
interruptions to ignore transcriptions.
- Fixed an issue introduced in 0.0.33 that would cause the LLM to generate
shorter output.
## [0.0.33] - 2024-06-25
### Changed
- Upgraded to Cartesia's new Python library 1.0.0. `CartesiaTTSService` now
expects a voice ID instead of a voice name (you can get the voice ID from
Cartesia's playground). You can also specify the audio `sample_rate` and
`encoding` instead of the previous `output_format`.
### Fixed
- Fixed an issue with asynchronous STT services (Deepgram and Azure) that could
cause static audio issues and interruptions to not work properly when dealing
with multiple LLMs sentences.
- Fixed an issue that could mix new LLM responses with previous ones when
handling interruptions.
- Fixed a Daily transport blocking situation that occurred while reading audio
frames after a participant left the room. Needs daily-python >= 0.10.1.
## [0.0.32] - 2024-06-22
### Added

View File

@@ -39,7 +39,7 @@ pip install "pipecat-ai[option,...]"
Your project may or may not need these, so they're made available as optional requirements. Here is a list:
- **AI services**: `anthropic`, `azure`, `deepgram`, `google`, `fal`, `moondream`, `openai`, `openpipe`, `playht`, `silero`, `whisper`
- **AI services**: `anthropic`, `azure`, `deepgram`, `gladia`, `google`, `fal`, `moondream`, `openai`, `openpipe`, `playht`, `silero`, `whisper`, `xtts`
- **Transports**: `local`, `websocket`, `daily`
## Code examples

View File

@@ -27,6 +27,9 @@ FAL_KEY=...
# Fireworks
FIREWORKS_API_KEY=...
# Gladia
GLADIA_API_KEY=...
# PlayHT
PLAY_HT_USER_ID=...
PLAY_HT_API_KEY=...

View File

@@ -0,0 +1,16 @@
FROM python:3.11-bullseye
# Open port 7860 for http service
ENV FAST_API_PORT=7860
EXPOSE 7860
# Install Python dependencies
COPY *.py .
COPY ./requirements.txt requirements.txt
RUN pip3 install --no-cache-dir --upgrade -r requirements.txt
# Install models
RUN python3 install_deps.py
# Start the FastAPI server
CMD python3 bot_runner.py --port ${FAST_API_PORT}

View File

@@ -0,0 +1,43 @@
# Fly.io deployment example
This project modifies the `bot_runner.py` server to launch a new machine for each user session. This is a recommended approach for production vs. running shell processess as your deployment will quickly run out of system resources under load.
To speed up machine boot times, we also download and cache Silero VAD as part of the Dockerfile (`install_deps.py`). If you are using other custom models, you can add them here too.
For this example, we are using Daily as a WebRTC transport and provisioning a new room and token for each session. You can use another transport, such as WebSockets, by modifying the `bot.py` and `bot_runner.py` files accordingly.
## Setting up your fly.io deployment
### Create your fly.toml file
You can copy the `example-fly.toml` as a reference. Be sure to change the app name to something unique.
### Create your .env file
Copy the base `env.example` to `.env` and enter the necessary API keys.
`FLY_APP_NAME` should match that in the `fly.toml` file.
### Launch a new fly.io project
`fly launch` or `fly launch --org your-org-name`
### Set the necessary app secrets from your .env
Note: you can do this manually via the fly.io dashboard under the "secrets" sub-section of your deployment (e.g. "https://fly.io/apps/fly-app-name/secrets") or run the following terminal command:
`cat .env | tr '\n' ' ' | xargs flyctl secrets set`
### Deploy your machine
`fly deploy`
## Connecting to your bot
Send a post request to your running fly.io instance:
`curl --location --request POST 'https://YOUR_FLY_APP_NAME/start_bot'`
This request will wait until the machine enters into a `starting` state, before returning the a room URL and token to join.

View File

@@ -0,0 +1,103 @@
import asyncio
import aiohttp
import os
import sys
import argparse
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import LLMAssistantResponseAggregator, LLMUserResponseAggregator
from pipecat.frames.frames import LLMMessagesFrame, EndFrame
from pipecat.services.openai import OpenAILLMService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
daily_api_key = os.getenv("DAILY_API_KEY", "")
daily_api_url = os.getenv("DAILY_API_URL", "https://api.daily.co/v1")
async def main(room_url: str, token: str):
async with aiohttp.ClientSession() as session:
transport = DailyTransport(
room_url,
token,
"Chatbot",
DailyParams(
api_url=daily_api_url,
api_key=daily_api_key,
audio_in_enabled=True,
audio_out_enabled=True,
camera_out_enabled=False,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
transcription_enabled=True,
)
)
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY", ""),
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are Chatbot, a friendly, helpful robot. Your output will be converted to audio so don't include special characters other than '!' or '?' in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by saying hello.",
},
]
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(),
tma_in,
llm,
tts,
transport.output(),
tma_out,
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await task.queue_frames([LLMMessagesFrame(messages)])
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
@transport.event_handler("on_call_state_updated")
async def on_call_state_updated(transport, state):
if state == "left":
await task.queue_frame(EndFrame())
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Pipecat Bot")
parser.add_argument("-u", type=str, help="Room URL")
parser.add_argument("-t", type=str, help="Token")
config = parser.parse_args()
asyncio.run(main(config.u, config.t))

View File

@@ -0,0 +1,199 @@
import os
import argparse
import subprocess
import requests
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomObject, DailyRoomProperties, DailyRoomParams
from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from dotenv import load_dotenv
load_dotenv(override=True)
# ------------ Configuration ------------ #
MAX_SESSION_TIME = 5 * 60 # 5 minutes
REQUIRED_ENV_VARS = [
'DAILY_API_KEY',
'OPENAI_API_KEY',
'ELEVENLABS_API_KEY',
'ELEVENLABS_VOICE_ID',
'FLY_API_KEY',
'FLY_APP_NAME',]
FLY_API_HOST = os.getenv("FLY_API_HOST", "https://api.machines.dev/v1")
FLY_APP_NAME = os.getenv("FLY_APP_NAME", "pipecat-fly-example")
FLY_API_KEY = os.getenv("FLY_API_KEY", "")
FLY_HEADERS = {
'Authorization': f"Bearer {FLY_API_KEY}",
'Content-Type': 'application/json'
}
daily_rest_helper = DailyRESTHelper(
os.getenv("DAILY_API_KEY", ""),
os.getenv("DAILY_API_URL", 'https://api.daily.co/v1'))
# ----------------- API ----------------- #
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"]
)
# ----------------- Main ----------------- #
def spawn_fly_machine(room_url: str, token: str):
# Use the same image as the bot runner
res = requests.get(f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines", headers=FLY_HEADERS)
if res.status_code != 200:
raise Exception(f"Unable to get machine info from Fly: {res.text}")
image = res.json()[0]['config']['image']
# Machine configuration
cmd = f"python3 bot.py -u {room_url} -t {token}"
cmd = cmd.split()
worker_props = {
"config": {
"image": image,
"auto_destroy": True,
"init": {
"cmd": cmd
},
"restart": {
"policy": "no"
},
"guest": {
"cpu_kind": "shared",
"cpus": 1,
"memory_mb": 1024
}
},
}
# Spawn a new machine instance
res = requests.post(
f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines",
headers=FLY_HEADERS,
json=worker_props)
if res.status_code != 200:
raise Exception(f"Problem starting a bot worker: {res.text}")
# Wait for the machine to enter the started state
vm_id = res.json()['id']
res = requests.get(
f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines/{vm_id}/wait?state=started",
headers=FLY_HEADERS)
if res.status_code != 200:
raise Exception(f"Bot was unable to enter started state: {res.text}")
print(f"Machine joined room: {room_url}")
@app.post("/start_bot")
async def start_bot(request: Request) -> JSONResponse:
try:
data = await request.json()
# Is this a webhook creation request?
if "test" in data:
return JSONResponse({"test": True})
except Exception as e:
pass
# Use specified room URL, or create a new one if not specified
room_url = os.getenv("DAILY_SAMPLE_ROOM_URL", "")
if not room_url:
params = DailyRoomParams(
properties=DailyRoomProperties()
)
try:
room: DailyRoomObject = daily_rest_helper.create_room(params=params)
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Unable to provision room {e}")
else:
# Check passed room URL exists, we should assume that it already has a sip set up
try:
room: DailyRoomObject = daily_rest_helper.get_room_from_url(room_url)
except Exception:
raise HTTPException(
status_code=500, detail=f"Room not found: {room_url}")
# Give the agent a token to join the session
token = daily_rest_helper.get_token(room.url, MAX_SESSION_TIME)
if not room or not token:
raise HTTPException(
status_code=500, detail=f"Failed to get token for room: {room_url}")
# Launch a new fly.io machine, or run as a shell process (not recommended)
run_as_process = os.getenv("RUN_AS_PROCESS", False)
if run_as_process:
try:
subprocess.Popen(
[f"python3 -m bot -u {room.url} -t {token}"],
shell=True,
bufsize=1,
cwd=os.path.dirname(os.path.abspath(__file__)))
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to start subprocess: {e}")
else:
try:
spawn_fly_machine(room.url, token)
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to spawn VM: {e}")
# Grab a token for the user to join with
user_token = daily_rest_helper.get_token(room.url, MAX_SESSION_TIME)
return JSONResponse({
"room_url": room.url,
"token": user_token,
})
if __name__ == "__main__":
# Check environment variables
for env_var in REQUIRED_ENV_VARS:
if env_var not in os.environ:
raise Exception(f"Missing environment variable: {env_var}.")
parser = argparse.ArgumentParser(description="Pipecat Bot Runner")
parser.add_argument("--host", type=str,
default=os.getenv("HOST", "0.0.0.0"), help="Host address")
parser.add_argument("--port", type=int,
default=os.getenv("PORT", 7860), help="Port number")
parser.add_argument("--reload", action="store_true",
default=False, help="Reload code on change")
config = parser.parse_args()
try:
import uvicorn
uvicorn.run(
"bot_runner:app",
host=config.host,
port=config.port,
reload=config.reload
)
except KeyboardInterrupt:
print("Pipecat runner shutting down...")

View File

@@ -0,0 +1,8 @@
DAILY_API_KEY=
DAILY_SAMPLE_ROOM_URL= # Enter a Daily room URL to use a set room URL each time (useful for local testing)
OPENAI_API_KEY=
ELEVENLABS_API_KEY=
ELEVENLABS_VOICE_ID=
FLY_API_KEY=
FLY_APP_NAME=
RUN_AS_PROCESS= # Spawn fly.io machine for each session or run as local process

View File

@@ -0,0 +1,25 @@
# fly.toml app configuration file generated for pipecat-fly-example on 2024-07-01T15:04:53+01:00
#
# See https://fly.io/docs/reference/configuration/ for information about how to use this file.
#
app = 'pipecat-fly-example'
primary_region = 'sjc'
[build]
[env]
FLY_APP_NAME = 'pipecat-fly-example'
[http_service]
internal_port = 7860
force_https = true
auto_stop_machines = true
auto_start_machines = true
min_machines_running = 0
processes = ['app']
[[vm]]
memory = 512
cpu_kind = 'shared'
cpus = 1

View File

@@ -0,0 +1,4 @@
import torch
# Download (cache) the Silero VAD model
torch.hub.load(repo_or_dir='snakers4/silero-vad', model='silero_vad', force_reload=True)

View File

@@ -0,0 +1,6 @@
pipecat-ai[daily,openai,silero]
fastapi
uvicorn
requests
python-dotenv
loguru

View File

@@ -67,11 +67,12 @@ async def main(room_url: str, token):
"Respond bot",
DailyParams(
audio_out_enabled=True,
camera_out_enabled=True,
camera_out_width=1024,
camera_out_height=1024,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
vad_analyzer=SileroVADAnalyzer(),
)
)
@@ -116,7 +117,7 @@ async def main(room_url: str, token):
async def on_first_participant_joined(transport, participant):
participant_name = participant["info"]["userName"] or ''
transport.capture_participant_transcription(participant["id"])
await task.queue_frames([TextFrame(f"Hi, this is {participant_name}.")])
await task.queue_frames([TextFrame(f"Hi there {participant_name}!")])
runner = PipelineRunner()

View File

@@ -38,7 +38,6 @@ async def main(room_url: str, token):
"Respond bot",
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=44100,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
@@ -47,8 +46,7 @@ async def main(room_url: str, token):
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_name="British Lady",
output_format="pcm_44100"
voice_id="a0e99841-438c-4a64-b679-ae501e7d6091", # Barbershop Man
)
llm = OpenAILLMService(

View File

@@ -0,0 +1,96 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import aiohttp
import os
import sys
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
from pipecat.services.deepgram import DeepgramSTTService, DeepgramTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.services.xtts import XTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
)
)
tts = XTTSService(
aiohttp_session=session,
voice_id="Claribel Dervla",
language="en",
base_url="http://localhost:8000"
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Transport user input
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -0,0 +1,101 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import aiohttp
import os
import sys
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
from pipecat.services.deepgram import DeepgramSTTService, DeepgramTTSService
from pipecat.services.gladia import GladiaSTTService
from pipecat.services.openai import OpenAILLMService
from pipecat.services.xtts import XTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
)
)
stt = GladiaSTTService(
api_key=os.getenv("GLADIA_API_KEY"),
)
tts = DeepgramTTSService(
aiohttp_session=session,
api_key=os.getenv("DEEPGRAM_API_KEY"),
voice="aura-helios-en"
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(), # Transport user input
stt, # STT
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -66,7 +66,6 @@ async def main(room_url: str, token):
"Pipecat",
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=44100,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
@@ -75,20 +74,17 @@ async def main(room_url: str, token):
news_lady = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_name="Newslady",
output_format="pcm_44100"
voice_id="bf991597-6c13-47e4-8411-91ec2de5c466", # Newslady
)
british_lady = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_name="British Lady",
output_format="pcm_44100"
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
barbershop_man = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_name="Barbershop Man",
output_format="pcm_44100"
voice_id="a0e99841-438c-4a64-b679-ae501e7d6091", # Barbershop Man
)
llm = OpenAILLMService(

View File

@@ -0,0 +1,108 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import aiohttp
import os
import sys
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.processors.user_idle_processor import UserIdleProcessor
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main(room_url: str, token):
async with aiohttp.ClientSession() as session:
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
)
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
async def user_idle_callback(user_idle: UserIdleProcessor):
messages.append(
{"role": "system", "content": "Ask the user if they are still there and try to prompt for some input, but be short."})
await user_idle.queue_frame(LLMMessagesFrame(messages))
user_idle = UserIdleProcessor(callback=user_idle_callback, timeout=5.0)
pipeline = Pipeline([
transport.input(), # Transport user input
user_idle, # Idle user check-in
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out # Assistant spoken responses
])
task = PipelineTask(pipeline, PipelineParams(
allow_interruptions=True,
enable_metrics=True,
report_only_initial_ttfb=True,
))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
(url, token) = configure()
asyncio.run(main(url, token))

View File

@@ -899,11 +899,11 @@ brace-expansion@^2.0.1:
balanced-match "^1.0.0"
braces@^3.0.2, braces@~3.0.2:
version "3.0.2"
resolved "https://registry.yarnpkg.com/braces/-/braces-3.0.2.tgz#3454e1a462ee8d599e236df336cd9ea4f8afe107"
integrity sha512-b8um+L1RzM3WDSzvhm6gIz1yfTbBt6YTlcEKAvsmqCZZFw46z626lVj9j1yEPW33H5H+lBQpZMP1k8l+78Ha0A==
version "3.0.3"
resolved "https://registry.yarnpkg.com/braces/-/braces-3.0.3.tgz#490332f40919452272d55a8480adc0c441358789"
integrity "sha1-SQMy9AkZRSJy1VqEgK3AxEE1h4k= sha512-yQbXgO/OSZVD2IsiLlro+7Hf6Q18EJrKSEsdoMzKePKXct3gvD8oLcOQdIzGupr5Fj+EDe8gO/lxc1BzfMpxvA=="
dependencies:
fill-range "^7.0.1"
fill-range "^7.1.1"
browserslist@^4.23.0:
version "4.23.0"
@@ -1551,10 +1551,10 @@ file-entry-cache@^6.0.1:
dependencies:
flat-cache "^3.0.4"
fill-range@^7.0.1:
version "7.0.1"
resolved "https://registry.yarnpkg.com/fill-range/-/fill-range-7.0.1.tgz#1919a6a7c75fe38b2c7c77e5198535da9acdda40"
integrity sha512-qOo9F+dMUmC2Lcb4BbVvnKJxTPjCm+RRpe4gDuGrzkL7mEVl/djYSu2OdQ2Pa302N4oqkSg9ir6jaLWJ2USVpQ==
fill-range@^7.1.1:
version "7.1.1"
resolved "https://registry.yarnpkg.com/fill-range/-/fill-range-7.1.1.tgz#44265d3cac07e3ea7dc247516380643754a05292"
integrity "sha1-RCZdPKwH4+p9wkdRY4BkN1SgUpI= sha512-YsGpe3WHLK8ZYi4tWDg2Jy3ebRz2rXowDxnld4bkQB00cc/1Zw9AWnC0i9ztDJitivtQvaI9KaLyKrc+hBW0yg=="
dependencies:
to-regex-range "^5.0.1"

View File

@@ -15,6 +15,7 @@ from pipecat.services.deepgram import DeepgramSTTService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.transports.network.fastapi_websocket import FastAPIWebsocketTransport, FastAPIWebsocketParams
from pipecat.vad.silero import SileroVADAnalyzer
from pipecat.serializers.twilio import TwilioFrameSerializer
from loguru import logger
@@ -25,7 +26,7 @@ logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def run_bot(websocket_client):
async def run_bot(websocket_client, stream_sid):
async with aiohttp.ClientSession() as session:
transport = FastAPIWebsocketTransport(
websocket=websocket_client,
@@ -34,7 +35,8 @@ async def run_bot(websocket_client):
add_wav_header=False,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True
vad_audio_passthrough=True,
serializer=TwilioFrameSerializer(stream_sid)
)
)

View File

@@ -1,3 +1,5 @@
import json
import uvicorn
from fastapi import FastAPI, WebSocket
@@ -26,8 +28,13 @@ async def start_call():
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
start_data = websocket.iter_text()
await start_data.__anext__()
call_data = json.loads(await start_data.__anext__())
print(call_data, flush=True)
stream_sid = call_data['start']['streamSid']
print("WebSocket connection accepted")
await run_bot(websocket)
await run_bot(websocket, stream_sid)
if __name__ == "__main__":

View File

@@ -4,7 +4,7 @@
#
# pip-compile --all-extras pyproject.toml
#
aiofiles==23.2.1
aiofiles==24.1.0
# via deepgram-sdk
aiohttp==3.9.5
# via
@@ -17,7 +17,7 @@ aiosignal==1.3.1
# via aiohttp
annotated-types==0.7.0
# via pydantic
anthropic==0.25.9
anthropic==0.28.1
# via
# openpipe
# pipecat-ai (pyproject.toml)
@@ -36,23 +36,21 @@ attrs==23.2.0
# via
# aiohttp
# openpipe
av==12.1.0
av==12.2.0
# via faster-whisper
azure-cognitiveservices-speech==1.37.0
azure-cognitiveservices-speech==1.38.0
# via pipecat-ai (pyproject.toml)
blinker==1.8.2
# via flask
cachetools==5.3.3
# via google-auth
cartesia==0.1.1
cartesia==1.0.3
# via pipecat-ai (pyproject.toml)
certifi==2024.6.2
# via
# httpcore
# httpx
# requests
cffi==1.16.0
# via sounddevice
charset-normalizer==3.3.2
# via requests
click==8.1.7
@@ -64,7 +62,7 @@ coloredlogs==15.0.1
# via onnxruntime
ctranslate2==4.3.1
# via faster-whisper
daily-python==0.10.0
daily-python==0.10.1
# via pipecat-ai (pyproject.toml)
dataclasses-json==0.6.7
# via
@@ -86,15 +84,15 @@ exceptiongroup==1.2.1
# via
# anyio
# pytest
fal-client==0.4.0
fal-client==0.4.1
# via pipecat-ai (pyproject.toml)
fastapi==0.111.0
# via pipecat-ai (pyproject.toml)
fastapi-cli==0.0.4
# via fastapi
faster-whisper==1.0.2
faster-whisper==1.0.3
# via pipecat-ai (pyproject.toml)
filelock==3.15.3
filelock==3.15.4
# via
# huggingface-hub
# pyht
@@ -113,22 +111,22 @@ frozenlist==1.4.1
# via
# aiohttp
# aiosignal
fsspec==2024.6.0
fsspec==2024.6.1
# via
# huggingface-hub
# torch
future==1.0.0
# via pyloudnorm
google-ai-generativelanguage==0.6.4
google-ai-generativelanguage==0.6.6
# via google-generativeai
google-api-core[grpc]==2.19.0
google-api-core[grpc]==2.19.1
# via
# google-ai-generativelanguage
# google-api-python-client
# google-generativeai
google-api-python-client==2.134.0
google-api-python-client==2.135.0
# via google-generativeai
google-auth==2.30.0
google-auth==2.31.0
# via
# google-ai-generativelanguage
# google-api-core
@@ -137,9 +135,9 @@ google-auth==2.30.0
# google-generativeai
google-auth-httplib2==0.2.0
# via google-api-python-client
google-generativeai==0.5.4
google-generativeai==0.7.1
# via pipecat-ai (pyproject.toml)
googleapis-common-protos==1.63.1
googleapis-common-protos==1.63.2
# via
# google-api-core
# grpcio-status
@@ -199,31 +197,35 @@ jinja2==3.1.4
# fastapi
# flask
# torch
jiter==0.5.0
# via anthropic
jsonpatch==1.33
# via langchain-core
jsonpointer==3.0.0
# via jsonpatch
langchain==0.2.5
langchain==0.2.6
# via
# langchain-community
# pipecat-ai (pyproject.toml)
langchain-community==0.2.5
langchain-community==0.2.6
# via pipecat-ai (pyproject.toml)
langchain-core==0.2.9
langchain-core==0.2.10
# via
# langchain
# langchain-community
# langchain-openai
# langchain-text-splitters
langchain-openai==0.1.9
langchain-openai==0.1.10
# via pipecat-ai (pyproject.toml)
langchain-text-splitters==0.2.1
langchain-text-splitters==0.2.2
# via langchain
langsmith==0.1.81
langsmith==0.1.83
# via
# langchain
# langchain-community
# langchain-core
llvmlite==0.43.0
# via numba
loguru==0.7.2
# via pipecat-ai (pyproject.toml)
markdown-it-py==3.0.0
@@ -246,14 +248,18 @@ mypy-extensions==1.0.0
# via typing-inspect
networkx==3.3
# via torch
numba==0.60.0
# via resampy
numpy==1.26.4
# via
# ctranslate2
# langchain
# langchain-community
# numba
# onnxruntime
# pipecat-ai (pyproject.toml)
# pyloudnorm
# resampy
# scipy
# torchvision
# transformers
@@ -282,20 +288,20 @@ nvidia-cusparse-cu12==12.1.0.106
# torch
nvidia-nccl-cu12==2.20.5
# via torch
nvidia-nvjitlink-cu12==12.5.40
nvidia-nvjitlink-cu12==12.5.82
# via
# nvidia-cusolver-cu12
# nvidia-cusparse-cu12
nvidia-nvtx-cu12==12.1.105
# via torch
onnxruntime==1.18.0
onnxruntime==1.18.1
# via faster-whisper
openai==1.26.0
openai==1.27.0
# via
# langchain-openai
# openpipe
# pipecat-ai (pyproject.toml)
openpipe==4.14.0
openpipe==4.16.0
# via pipecat-ai (pyproject.toml)
orjson==3.10.5
# via
@@ -338,9 +344,7 @@ pyasn1-modules==0.4.0
# via google-auth
pyaudio==0.2.14
# via pipecat-ai (pyproject.toml)
pycparser==2.22
# via cffi
pydantic==2.7.4
pydantic==2.8.0
# via
# anthropic
# fastapi
@@ -349,7 +353,7 @@ pydantic==2.7.4
# langchain-core
# langsmith
# openai
pydantic-core==2.18.4
pydantic-core==2.20.0
# via pydantic
pygments==2.18.0
# via rich
@@ -396,6 +400,8 @@ requests==2.32.3
# pyht
# tiktoken
# transformers
resampy==0.4.3
# via pipecat-ai (pyproject.toml)
rich==13.7.1
# via typer
rsa==4.9
@@ -404,7 +410,7 @@ safetensors==0.4.3
# via
# timm
# transformers
scipy==1.13.1
scipy==1.14.0
# via pyloudnorm
shellingham==1.5.4
# via typer
@@ -416,8 +422,6 @@ sniffio==1.3.1
# anyio
# httpx
# openai
sounddevice==0.4.7
# via pipecat-ai (pyproject.toml)
sqlalchemy==2.0.31
# via
# langchain
@@ -428,7 +432,7 @@ sympy==1.12.1
# via
# onnxruntime
# torch
tenacity==8.4.1
tenacity==8.4.2
# via
# langchain
# langchain-community

View File

@@ -1,10 +1,10 @@
#
# This file is autogenerated by pip-compile with Python 3.12
# This file is autogenerated by pip-compile with Python 3.10
# by the following command:
#
# pip-compile --all-extras pyproject.toml
#
aiofiles==23.2.1
aiofiles==24.1.0
# via deepgram-sdk
aiohttp==3.9.5
# via
@@ -17,7 +17,7 @@ aiosignal==1.3.1
# via aiohttp
annotated-types==0.7.0
# via pydantic
anthropic==0.25.9
anthropic==0.28.1
# via
# openpipe
# pipecat-ai (pyproject.toml)
@@ -28,27 +28,29 @@ anyio==4.4.0
# openai
# starlette
# watchfiles
async-timeout==4.0.3
# via
# aiohttp
# langchain
attrs==23.2.0
# via
# aiohttp
# openpipe
av==12.1.0
av==12.2.0
# via faster-whisper
azure-cognitiveservices-speech==1.37.0
azure-cognitiveservices-speech==1.38.0
# via pipecat-ai (pyproject.toml)
blinker==1.8.2
# via flask
cachetools==5.3.3
# via google-auth
cartesia==0.1.1
cartesia==1.0.3
# via pipecat-ai (pyproject.toml)
certifi==2024.6.2
# via
# httpcore
# httpx
# requests
cffi==1.16.0
# via sounddevice
charset-normalizer==3.3.2
# via requests
click==8.1.7
@@ -60,7 +62,7 @@ coloredlogs==15.0.1
# via onnxruntime
ctranslate2==4.3.1
# via faster-whisper
daily-python==0.10.0
daily-python==0.10.1
# via pipecat-ai (pyproject.toml)
dataclasses-json==0.6.7
# via
@@ -78,15 +80,19 @@ einops==0.8.0
# via pipecat-ai (pyproject.toml)
email-validator==2.2.0
# via fastapi
fal-client==0.4.0
exceptiongroup==1.2.1
# via
# anyio
# pytest
fal-client==0.4.1
# via pipecat-ai (pyproject.toml)
fastapi==0.111.0
# via pipecat-ai (pyproject.toml)
fastapi-cli==0.0.4
# via fastapi
faster-whisper==1.0.2
faster-whisper==1.0.3
# via pipecat-ai (pyproject.toml)
filelock==3.15.3
filelock==3.15.4
# via
# huggingface-hub
# pyht
@@ -104,22 +110,22 @@ frozenlist==1.4.1
# via
# aiohttp
# aiosignal
fsspec==2024.6.0
fsspec==2024.6.1
# via
# huggingface-hub
# torch
future==1.0.0
# via pyloudnorm
google-ai-generativelanguage==0.6.4
google-ai-generativelanguage==0.6.6
# via google-generativeai
google-api-core[grpc]==2.19.0
google-api-core[grpc]==2.19.1
# via
# google-ai-generativelanguage
# google-api-python-client
# google-generativeai
google-api-python-client==2.134.0
google-api-python-client==2.135.0
# via google-generativeai
google-auth==2.30.0
google-auth==2.31.0
# via
# google-ai-generativelanguage
# google-api-core
@@ -128,9 +134,9 @@ google-auth==2.30.0
# google-generativeai
google-auth-httplib2==0.2.0
# via google-api-python-client
google-generativeai==0.5.4
google-generativeai==0.7.1
# via pipecat-ai (pyproject.toml)
googleapis-common-protos==1.63.1
googleapis-common-protos==1.63.2
# via
# google-api-core
# grpcio-status
@@ -188,31 +194,35 @@ jinja2==3.1.4
# fastapi
# flask
# torch
jiter==0.5.0
# via anthropic
jsonpatch==1.33
# via langchain-core
jsonpointer==3.0.0
# via jsonpatch
langchain==0.2.5
langchain==0.2.6
# via
# langchain-community
# pipecat-ai (pyproject.toml)
langchain-community==0.2.5
langchain-community==0.2.6
# via pipecat-ai (pyproject.toml)
langchain-core==0.2.9
langchain-core==0.2.10
# via
# langchain
# langchain-community
# langchain-openai
# langchain-text-splitters
langchain-openai==0.1.9
langchain-openai==0.1.10
# via pipecat-ai (pyproject.toml)
langchain-text-splitters==0.2.1
langchain-text-splitters==0.2.2
# via langchain
langsmith==0.1.81
langsmith==0.1.83
# via
# langchain
# langchain-community
# langchain-core
llvmlite==0.43.0
# via numba
loguru==0.7.2
# via pipecat-ai (pyproject.toml)
markdown-it-py==3.0.0
@@ -235,25 +245,29 @@ mypy-extensions==1.0.0
# via typing-inspect
networkx==3.3
# via torch
numba==0.60.0
# via resampy
numpy==1.26.4
# via
# ctranslate2
# langchain
# langchain-community
# numba
# onnxruntime
# pipecat-ai (pyproject.toml)
# pyloudnorm
# resampy
# scipy
# torchvision
# transformers
onnxruntime==1.18.0
onnxruntime==1.18.1
# via faster-whisper
openai==1.26.0
openai==1.27.0
# via
# langchain-openai
# openpipe
# pipecat-ai (pyproject.toml)
openpipe==4.14.0
openpipe==4.16.0
# via pipecat-ai (pyproject.toml)
orjson==3.10.5
# via
@@ -296,9 +310,7 @@ pyasn1-modules==0.4.0
# via google-auth
pyaudio==0.2.14
# via pipecat-ai (pyproject.toml)
pycparser==2.22
# via cffi
pydantic==2.7.4
pydantic==2.8.0
# via
# anthropic
# fastapi
@@ -307,7 +319,7 @@ pydantic==2.7.4
# langchain-core
# langsmith
# openai
pydantic-core==2.18.4
pydantic-core==2.20.0
# via pydantic
pygments==2.18.0
# via rich
@@ -354,6 +366,8 @@ requests==2.32.3
# pyht
# tiktoken
# transformers
resampy==0.4.3
# via pipecat-ai (pyproject.toml)
rich==13.7.1
# via typer
rsa==4.9
@@ -362,7 +376,7 @@ safetensors==0.4.3
# via
# timm
# transformers
scipy==1.13.1
scipy==1.14.0
# via pyloudnorm
shellingham==1.5.4
# via typer
@@ -374,8 +388,6 @@ sniffio==1.3.1
# anyio
# httpx
# openai
sounddevice==0.4.7
# via pipecat-ai (pyproject.toml)
sqlalchemy==2.0.31
# via
# langchain
@@ -386,7 +398,7 @@ sympy==1.12.1
# via
# onnxruntime
# torch
tenacity==8.4.1
tenacity==8.4.2
# via
# langchain
# langchain-community
@@ -400,6 +412,8 @@ tokenizers==0.19.1
# anthropic
# faster-whisper
# transformers
tomli==2.0.1
# via pytest
torch==2.3.1
# via
# pipecat-ai (pyproject.toml)
@@ -423,6 +437,7 @@ typer==0.12.3
typing-extensions==4.12.2
# via
# anthropic
# anyio
# deepgram-sdk
# fastapi
# google-generativeai
@@ -435,6 +450,7 @@ typing-extensions==4.12.2
# torch
# typer
# typing-inspect
# uvicorn
typing-inspect==0.9.0
# via dataclasses-json
ujson==5.10.0

View File

@@ -34,24 +34,26 @@ Source = "https://github.com/pipecat-ai/pipecat"
Website = "https://pipecat.ai"
[project.optional-dependencies]
anthropic = [ "anthropic~=0.25.7" ]
azure = [ "azure-cognitiveservices-speech~=1.37.0" ]
cartesia = [ "numpy~=1.26.0", "sounddevice", "cartesia" ]
daily = [ "daily-python~=0.10.0" ]
anthropic = [ "anthropic~=0.28.1" ]
azure = [ "azure-cognitiveservices-speech~=1.38.0" ]
cartesia = [ "cartesia~=1.0.3" ]
daily = [ "daily-python~=0.10.1" ]
deepgram = [ "deepgram-sdk~=3.2.7" ]
examples = [ "python-dotenv~=1.0.0", "flask~=3.0.3", "flask_cors~=4.0.1" ]
fal = [ "fal-client~=0.4.0" ]
google = [ "google-generativeai~=0.5.3" ]
fireworks = [ "openai~=1.26.0" ]
langchain = [ "langchain~=0.2.1", "langchain-community~=0.2.1", "langchain-openai~=0.1.8" ]
fal = [ "fal-client~=0.4.1" ]
gladia = [ "websockets~=12.0" ]
google = [ "google-generativeai~=0.7.1" ]
fireworks = [ "openai~=1.27.0" ]
langchain = [ "langchain~=0.2.6", "langchain-community~=0.2.6", "langchain-openai~=0.1.10" ]
local = [ "pyaudio~=0.2.0" ]
moondream = [ "einops~=0.8.0", "timm~=0.9.16", "transformers~=4.40.2" ]
openai = [ "openai~=1.26.0" ]
openpipe = [ "openpipe~=4.14.0" ]
openai = [ "openai~=1.27.0" ]
openpipe = [ "openpipe~=4.16.0" ]
playht = [ "pyht~=0.0.28" ]
silero = [ "torch~=2.3.0", "torchaudio~=2.3.0" ]
silero = [ "torch~=2.3.1", "torchaudio~=2.3.1" ]
websocket = [ "websockets~=12.0", "fastapi~=0.111.0" ]
whisper = [ "faster-whisper~=1.0.2" ]
whisper = [ "faster-whisper~=1.0.3" ]
xtts = [ "resampy~=0.4.3" ]
[tool.setuptools.packages.find]
# All the following settings are optional:

View File

@@ -240,12 +240,23 @@ class StopInterruptionFrame(SystemFrame):
pass
@dataclass
class BotSpeakingFrame(SystemFrame):
"""Emitted by transport outputs while the bot is still speaking. This can be
used, for example, to detect when a user is idle. That is, while the bot is
speaking we don't want to trigger any user idle timeout since the user might
be listening.
"""
pass
@dataclass
class MetricsFrame(SystemFrame):
"""Emitted by processor that can compute metrics like latencies.
"""
ttfb: Mapping[str, float]
ttfb: List[Mapping[str, Any]] | None = None
processing: List[Mapping[str, Any]] | None = None
#
# Control frames

View File

@@ -15,7 +15,7 @@ from loguru import logger
class PipelineRunner:
def __init__(self, name: str | None = None, handle_sigint: bool = True):
def __init__(self, *, name: str | None = None, handle_sigint: bool = True):
self.id: int = obj_id()
self.name: str = name or f"{self.__class__.__name__}#{obj_count(self)}"

View File

@@ -95,8 +95,9 @@ class PipelineTask:
def _initial_metrics_frame(self) -> MetricsFrame:
processors = self._pipeline.processors_with_metrics()
ttfb = dict(zip([p.name for p in processors], [0] * len(processors)))
return MetricsFrame(ttfb=ttfb)
ttfb = [{"name": p.name, "time": 0.0} for p in processors]
processing = [{"name": p.name, "time": 0.0} for p in processors]
return MetricsFrame(ttfb=ttfb, processing=processing)
async def _process_down_queue(self):
start_frame = StartFrame(

View File

@@ -0,0 +1,63 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
from pipecat.frames.frames import EndFrame, Frame, StartInterruptionFrame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
class AsyncFrameProcessor(FrameProcessor):
def __init__(
self,
*,
name: str | None = None,
loop: asyncio.AbstractEventLoop | None = None,
**kwargs):
super().__init__(name=name, loop=loop, **kwargs)
self._create_push_task()
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, StartInterruptionFrame):
await self._handle_interruptions(frame)
async def queue_frame(
self,
frame: Frame,
direction: FrameDirection = FrameDirection.DOWNSTREAM):
await self._push_queue.put((frame, direction))
async def cleanup(self):
self._push_frame_task.cancel()
await self._push_frame_task
async def _handle_interruptions(self, frame: Frame):
# Cancel the task. This will stop pushing frames downstream.
self._push_frame_task.cancel()
await self._push_frame_task
# Push an out-of-band frame (i.e. not using the ordered push
# frame task).
await self.push_frame(frame)
# Create a new queue and task.
self._create_push_task()
def _create_push_task(self):
self._push_queue = asyncio.Queue()
self._push_frame_task = self.get_event_loop().create_task(self._push_frame_task_handler())
async def _push_frame_task_handler(self):
running = True
while running:
try:
(frame, direction) = await self._push_queue.get()
await self.push_frame(frame, direction)
running = not isinstance(frame, EndFrame)
except asyncio.CancelledError:
break

View File

@@ -82,5 +82,5 @@ class WakeCheckFilter(FrameProcessor):
await self.push_frame(frame, direction)
except Exception as e:
error_msg = f"Error in wake word filter: {e}"
logger.error(error_msg)
logger.exception(error_msg)
await self.push_error(ErrorFrame(error_msg))

View File

@@ -9,7 +9,7 @@ import time
from enum import Enum
from pipecat.frames.frames import ErrorFrame, Frame, MetricsFrame, StartFrame, UserStoppedSpeakingFrame
from pipecat.frames.frames import ErrorFrame, Frame, MetricsFrame, StartFrame, StartInterruptionFrame, UserStoppedSpeakingFrame
from pipecat.utils.utils import obj_count, obj_id
from loguru import logger
@@ -20,10 +20,53 @@ class FrameDirection(Enum):
UPSTREAM = 2
class FrameProcessorMetrics:
def __init__(self, name: str):
self._name = name
self._start_ttfb_time = 0
self._start_processing_time = 0
self._should_report_ttfb = True
async def start_ttfb_metrics(self, report_only_initial_ttfb):
if self._should_report_ttfb:
self._start_ttfb_time = time.time()
self._should_report_ttfb = not report_only_initial_ttfb
async def stop_ttfb_metrics(self):
if self._start_ttfb_time == 0:
return None
value = time.time() - self._start_ttfb_time
logger.debug(f"{self._name} TTFB: {value}")
ttfb = {
"processor": self._name,
"value": value
}
self._start_ttfb_time = 0
return MetricsFrame(ttfb=[ttfb])
async def start_processing_metrics(self):
self._start_processing_time = time.time()
async def stop_processing_metrics(self):
if self._start_processing_time == 0:
return None
value = time.time() - self._start_processing_time
logger.debug(f"{self._name} processing time: {value}")
processing = {
"processor": self._name,
"value": value
}
self._start_processing_time = 0
return MetricsFrame(processing=[processing])
class FrameProcessor:
def __init__(
self,
*,
name: str | None = None,
loop: asyncio.AbstractEventLoop | None = None,
**kwargs):
@@ -39,8 +82,7 @@ class FrameProcessor:
self._report_only_initial_ttfb = False
# Metrics
self._start_ttfb_time = 0
self._should_report_ttfb = True
self._metrics = FrameProcessorMetrics(name=self.name)
@property
def interruptions_allowed(self):
@@ -58,16 +100,28 @@ class FrameProcessor:
return False
async def start_ttfb_metrics(self):
if self.metrics_enabled and self._should_report_ttfb:
self._start_ttfb_time = time.time()
self._should_report_ttfb = not self._report_only_initial_ttfb
if self.can_generate_metrics() and self.metrics_enabled:
await self._metrics.start_ttfb_metrics(self._report_only_initial_ttfb)
async def stop_ttfb_metrics(self):
if self.metrics_enabled and self._start_ttfb_time > 0:
ttfb = time.time() - self._start_ttfb_time
logger.debug(f"{self.name} TTFB: {ttfb}")
await self.push_frame(MetricsFrame(ttfb={self.name: ttfb}))
self._start_ttfb_time = 0
if self.can_generate_metrics() and self.metrics_enabled:
frame = await self._metrics.stop_ttfb_metrics()
if frame:
await self.push_frame(frame)
async def start_processing_metrics(self):
if self.can_generate_metrics() and self.metrics_enabled:
await self._metrics.start_processing_metrics()
async def stop_processing_metrics(self):
if self.can_generate_metrics() and self.metrics_enabled:
frame = await self._metrics.stop_processing_metrics()
if frame:
await self.push_frame(frame)
async def stop_all_metrics(self):
await self.stop_ttfb_metrics()
await self.stop_processing_metrics()
async def cleanup(self):
pass
@@ -85,6 +139,8 @@ class FrameProcessor:
self._allow_interruptions = frame.allow_interruptions
self._enable_metrics = frame.enable_metrics
self._report_only_initial_ttfb = frame.report_only_initial_ttfb
elif isinstance(frame, StartInterruptionFrame):
await self.stop_all_metrics()
elif isinstance(frame, UserStoppedSpeakingFrame):
self._should_report_ttfb = True
@@ -92,12 +148,15 @@ class FrameProcessor:
await self.push_frame(error, FrameDirection.UPSTREAM)
async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
if direction == FrameDirection.DOWNSTREAM and self._next:
logger.trace(f"Pushing {frame} from {self} to {self._next}")
await self._next.process_frame(frame, direction)
elif direction == FrameDirection.UPSTREAM and self._prev:
logger.trace(f"Pushing {frame} upstream from {self} to {self._prev}")
await self._prev.process_frame(frame, direction)
try:
if direction == FrameDirection.DOWNSTREAM and self._next:
logger.trace(f"Pushing {frame} from {self} to {self._next}")
await self._next.process_frame(frame, direction)
elif direction == FrameDirection.UPSTREAM and self._prev:
logger.trace(f"Pushing {frame} upstream from {self} to {self._prev}")
await self._prev.process_frame(frame, direction)
except Exception as e:
logger.exception(f"Uncaught exception in {self}: {e}")
def __str__(self):
return self.name

View File

@@ -75,5 +75,6 @@ class LangchainProcessor(FrameProcessor):
except GeneratorExit:
logger.warning(f"{self} generator was closed prematurely")
except Exception as e:
logger.error(f"{self} an unknown error occurred: {e}")
await self.push_frame(LLMFullResponseEndFrame())
logger.exception(f"{self} an unknown error occurred: {e}")
finally:
await self.push_frame(LLMFullResponseEndFrame())

View File

@@ -0,0 +1,76 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
from typing import Awaitable, Callable, List
from pipecat.frames.frames import Frame, SystemFrame
from pipecat.processors.async_frame_processor import AsyncFrameProcessor
from pipecat.processors.frame_processor import FrameDirection
class IdleFrameProcessor(AsyncFrameProcessor):
"""This class waits to receive any frame or list of desired frames within a
given timeout. If the timeout is reached before receiving any of those
frames the provided callback will be called.
The callback can then be used to push frames downstream by using
`queue_frame()` (or `push_frame()` for system frames).
"""
def __init__(
self,
*,
callback: Callable[["IdleFrameProcessor"], Awaitable[None]],
timeout: float,
types: List[type] = [],
**kwargs):
super().__init__(**kwargs)
self._callback = callback
self._timeout = timeout
self._types = types
self._create_idle_task()
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, SystemFrame):
await self.push_frame(frame, direction)
else:
await self.queue_frame(frame, direction)
# If we are not waiting for any specific frame set the event, otherwise
# check if we have received one of the desired frames.
if not self._types:
self._idle_event.set()
else:
for t in self._types:
if isinstance(frame, t):
self._idle_event.set()
# If we are not waiting for any specific frame set the event, otherwise
async def cleanup(self):
self._idle_task.cancel()
await self._idle_task
def _create_idle_task(self):
self._idle_event = asyncio.Event()
self._idle_task = self.get_event_loop().create_task(self._idle_task_handler())
async def _idle_task_handler(self):
while True:
try:
await asyncio.wait_for(self._idle_event.wait(), timeout=self._timeout)
except asyncio.TimeoutError:
await self._callback(self)
except asyncio.CancelledError:
break
finally:
self._idle_event.clear()

View File

@@ -0,0 +1,77 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
from typing import Awaitable, Callable
from pipecat.frames.frames import BotSpeakingFrame, Frame, StartInterruptionFrame, StopInterruptionFrame, SystemFrame
from pipecat.processors.async_frame_processor import AsyncFrameProcessor
from pipecat.processors.frame_processor import FrameDirection
class UserIdleProcessor(AsyncFrameProcessor):
"""This class is useful to check if the user is interacting with the bot
within a given timeout. If the timeout is reached before any interaction
occurred the provided callback will be called.
The callback can then be used to push frames downstream by using
`queue_frame()` (or `push_frame()` for system frames).
"""
def __init__(
self,
*,
callback: Callable[["UserIdleProcessor"], Awaitable[None]],
timeout: float,
**kwargs):
super().__init__(**kwargs)
self._callback = callback
self._timeout = timeout
self._interrupted = False
self._create_idle_task()
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, SystemFrame):
await self.push_frame(frame, direction)
else:
await self.queue_frame(frame, direction)
# We shouldn't call the idle callback if the user or the bot are speaking.
if isinstance(frame, StartInterruptionFrame):
self._interrupted = True
self._idle_event.set()
elif isinstance(frame, StopInterruptionFrame):
self._interrupted = False
self._idle_event.set()
elif isinstance(frame, BotSpeakingFrame):
self._idle_event.set()
async def cleanup(self):
self._idle_task.cancel()
await self._idle_task
def _create_idle_task(self):
self._idle_event = asyncio.Event()
self._idle_task = self.get_event_loop().create_task(self._idle_task_handler())
async def _idle_task_handler(self):
while True:
try:
await asyncio.wait_for(self._idle_event.wait(), timeout=self._timeout)
except asyncio.TimeoutError:
if not self._interrupted:
await self._callback(self)
except asyncio.CancelledError:
break
finally:
self._idle_event.clear()

View File

@@ -17,8 +17,8 @@ class TwilioFrameSerializer(FrameSerializer):
AudioRawFrame: "audio",
}
def __init__(self):
self._sid = None
def __init__(self, stream_sid: str):
self._stream_sid = stream_sid
def serialize(self, frame: Frame) -> str | bytes | None:
if not isinstance(frame, AudioRawFrame):
@@ -30,7 +30,7 @@ class TwilioFrameSerializer(FrameSerializer):
payload = base64.b64encode(serialized_data).decode("utf-8")
answer = {
"event": "media",
"streamSid": self._sid,
"streamSid": self._stream_sid,
"media": {
"payload": payload
}
@@ -41,9 +41,6 @@ class TwilioFrameSerializer(FrameSerializer):
def deserialize(self, data: str | bytes) -> Frame | None:
message = json.loads(data)
if not self._sid:
self._sid = message["streamSid"] if "streamSid" in message else None
if message["event"] != "media":
return None
else:

View File

@@ -16,13 +16,15 @@ from pipecat.frames.frames import (
EndFrame,
ErrorFrame,
Frame,
LLMFullResponseEndFrame,
StartFrame,
StartInterruptionFrame,
TTSStartedFrame,
TTSStoppedFrame,
TextFrame,
VisionImageRawFrame,
LLMFullResponseEndFrame,
)
from pipecat.processors.async_frame_processor import AsyncFrameProcessor
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.utils.audio import calculate_audio_volume
from pipecat.utils.utils import exp_smoothing
@@ -59,6 +61,30 @@ class AIService(FrameProcessor):
await self.push_frame(f)
class AsyncAIService(AsyncFrameProcessor):
def __init__(self, **kwargs):
super().__init__(**kwargs)
async def start(self, frame: StartFrame):
pass
async def stop(self, frame: EndFrame):
pass
async def cancel(self, frame: CancelFrame):
pass
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, StartFrame):
await self.start(frame)
elif isinstance(frame, CancelFrame):
await self.cancel(frame)
elif isinstance(frame, EndFrame):
await self.stop(frame)
class LLMService(AIService):
"""This class is a no-op but serves as a base class for LLM services."""
@@ -92,7 +118,7 @@ class LLMService(AIService):
class TTSService(AIService):
def __init__(self, aggregate_sentences: bool = True, **kwargs):
def __init__(self, *, aggregate_sentences: bool = True, **kwargs):
super().__init__(**kwargs)
self._aggregate_sentences: bool = aggregate_sentences
self._current_sentence: str = ""
@@ -114,15 +140,21 @@ class TTSService(AIService):
if self._current_sentence.strip().endswith(
(".", "?", "!")) and not self._current_sentence.strip().endswith(
("Mr,", "Mrs.", "Ms.", "Dr.")):
text = self._current_sentence.strip()
text = self._current_sentence
self._current_sentence = ""
if text:
await self._push_tts_frames(text)
async def _push_tts_frames(self, text: str):
text = text.strip()
if not text:
return
await self.push_frame(TTSStartedFrame())
await self.start_processing_metrics()
await self.process_generator(self.run_tts(text))
await self.stop_processing_metrics()
await self.push_frame(TTSStoppedFrame())
# We send the original text after the audio. This way, if we are
# interrupted, the text is not added to the assistant context.
@@ -133,14 +165,12 @@ class TTSService(AIService):
if isinstance(frame, TextFrame):
await self._process_text_frame(frame)
elif isinstance(frame, EndFrame):
if self._current_sentence:
await self._push_tts_frames(self._current_sentence)
await self.push_frame(frame)
elif isinstance(frame, LLMFullResponseEndFrame):
if self._current_sentence:
await self._push_tts_frames(self._current_sentence.strip())
self._current_sentence = ""
elif isinstance(frame, StartInterruptionFrame):
self._current_sentence = ""
await self.push_frame(frame, direction)
elif isinstance(frame, LLMFullResponseEndFrame) or isinstance(frame, EndFrame):
self._current_sentence = ""
await self._push_tts_frames(self._current_sentence)
await self.push_frame(frame)
else:
await self.push_frame(frame, direction)
@@ -150,6 +180,7 @@ class STTService(AIService):
"""STTService is a base class for speech-to-text services."""
def __init__(self,
*,
min_volume: float = 0.6,
max_silence_secs: float = 0.3,
max_buffer_secs: float = 1.5,
@@ -205,7 +236,9 @@ class STTService(AIService):
self._silence_num_frames = 0
self._wave.close()
self._content.seek(0)
await self.start_processing_metrics()
await self.process_generator(self.run_stt(self._content.read()))
await self.stop_processing_metrics()
(self._content, self._wave) = self._new_wave()
async def process_frame(self, frame: Frame, direction: FrameDirection):
@@ -238,7 +271,9 @@ class ImageGenService(AIService):
if isinstance(frame, TextFrame):
await self.push_frame(frame, direction)
await self.start_processing_metrics()
await self.process_generator(self.run_image_gen(frame.text))
await self.stop_processing_metrics()
else:
await self.push_frame(frame, direction)
@@ -258,6 +293,8 @@ class VisionService(AIService):
await super().process_frame(frame, direction)
if isinstance(frame, VisionImageRawFrame):
await self.start_processing_metrics()
await self.process_generator(self.run_vision(frame))
await self.stop_processing_metrics()
else:
await self.push_frame(frame, direction)

View File

@@ -41,6 +41,7 @@ class AnthropicLLMService(LLMService):
def __init__(
self,
*,
api_key: str,
model: str = "claude-3-opus-20240229",
max_tokens: int = 1024):
@@ -122,7 +123,7 @@ class AnthropicLLMService(LLMService):
await self.push_frame(LLMResponseEndFrame())
except Exception as e:
logger.error(f"{self} exception: {e}")
logger.exception(f"{self} exception: {e}")
finally:
await self.push_frame(LLMFullResponseEndFrame())

View File

@@ -12,9 +12,18 @@ import time
from PIL import Image
from typing import AsyncGenerator
from pipecat.frames.frames import AudioRawFrame, CancelFrame, EndFrame, ErrorFrame, Frame, StartFrame, SystemFrame, TranscriptionFrame, URLImageRawFrame
from pipecat.frames.frames import (
AudioRawFrame,
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
StartFrame,
SystemFrame,
TranscriptionFrame,
URLImageRawFrame)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_services import AIService, TTSService, ImageGenService
from pipecat.services.ai_services import AsyncAIService, TTSService, ImageGenService
from pipecat.services.openai import BaseOpenAILLMService
from loguru import logger
@@ -34,7 +43,7 @@ try:
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
"In order to use Azure TTS, you need to `pip install pipecat-ai[azure]`. Also, set `AZURE_SPEECH_API_KEY` and `AZURE_SPEECH_REGION` environment variables.")
"In order to use Azure, you need to `pip install pipecat-ai[azure]`. Also, set `AZURE_SPEECH_API_KEY` and `AZURE_SPEECH_REGION` environment variables.")
raise Exception(f"Missing module: {e}")
@@ -73,7 +82,7 @@ class AzureTTSService(TTSService):
return True
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"Generating TTS: {text}")
logger.debug(f"Generating TTS: [{text}]")
await self.start_ttfb_metrics()
@@ -100,7 +109,7 @@ class AzureTTSService(TTSService):
logger.error(f"{self} error: {cancellation_details.error_details}")
class AzureSTTService(AIService):
class AzureSTTService(AsyncAIService):
def __init__(
self,
*,
@@ -123,8 +132,6 @@ class AzureSTTService(AIService):
speech_config=speech_config, audio_config=audio_config)
self._speech_recognizer.recognized.connect(self._on_handle_recognized)
self._create_push_task()
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
@@ -140,34 +147,16 @@ class AzureSTTService(AIService):
async def stop(self, frame: EndFrame):
self._speech_recognizer.stop_continuous_recognition_async()
await self._push_queue.put((frame, FrameDirection.DOWNSTREAM))
await self._push_frame_task
self._audio_stream.close()
async def cancel(self, frame: CancelFrame):
self._speech_recognizer.stop_continuous_recognition_async()
self._push_frame_task.cancel()
await self._push_frame_task
def _create_push_task(self):
self._push_queue = asyncio.Queue()
self._push_frame_task = self.get_event_loop().create_task(self._push_frame_task_handler())
async def _push_frame_task_handler(self):
running = True
while running:
try:
(frame, direction) = await self._push_queue.get()
await self.push_frame(frame, direction)
running = not isinstance(frame, EndFrame)
except asyncio.CancelledError:
break
self._audio_stream.close()
def _on_handle_recognized(self, event):
if event.result.reason == ResultReason.RecognizedSpeech and len(event.result.text) > 0:
direction = FrameDirection.DOWNSTREAM
frame = TranscriptionFrame(event.result.text, "", int(time.time_ns() / 1000000))
asyncio.run_coroutine_threadsafe(
self._push_queue.put((frame, direction)), self.get_event_loop())
asyncio.run_coroutine_threadsafe(self.queue_frame(frame), self.get_event_loop())
class AzureImageGenServiceREST(ImageGenService):

View File

@@ -4,11 +4,11 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
from cartesia.tts import AsyncCartesiaTTS
from cartesia import AsyncCartesia
from typing import AsyncGenerator
from pipecat.frames.frames import AudioRawFrame, Frame
from pipecat.frames.frames import AudioRawFrame, CancelFrame, EndFrame, Frame, StartFrame
from pipecat.services.ai_services import TTSService
from loguru import logger
@@ -20,44 +20,57 @@ class CartesiaTTSService(TTSService):
self,
*,
api_key: str,
voice_name: str,
model_id: str = "upbeat-moon",
output_format: str = "pcm_16000",
voice_id: str,
model_id: str = "sonic-english",
encoding: str = "pcm_s16le",
sample_rate: int = 16000,
**kwargs):
super().__init__(**kwargs)
self._api_key = api_key
self._voice_name = voice_name
self._voice_id = voice_id
self._model_id = model_id
self._output_format = output_format
try:
self._client = AsyncCartesiaTTS(api_key=self._api_key)
voices = self._client.get_voices()
voice_id = voices[self._voice_name]["id"]
self._voice = self._client.get_voice_embedding(voice_id=voice_id)
except Exception as e:
logger.error(f"{self} initialization error: {e}")
self._output_format = {
"container": "raw",
"encoding": encoding,
"sample_rate": sample_rate,
}
self._client = None
def can_generate_metrics(self) -> bool:
return True
async def start(self, frame: StartFrame):
try:
self._client = AsyncCartesia(api_key=self._api_key)
self._voice = self._client.voices.get(id=self._voice_id)
except Exception as e:
logger.exception(f"{self} initialization error: {e}")
async def stop(self, frame: EndFrame):
if self._client:
await self._client.close()
async def cancel(self, frame: CancelFrame):
if self._client:
await self._client.close()
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"Generating TTS: [{text}]")
try:
await self.start_ttfb_metrics()
chunk_generator = await self._client.generate(
chunk_generator = await self._client.tts.sse(
stream=True,
transcript=text,
voice=self._voice,
voice_embedding=self._voice["embedding"],
model_id=self._model_id,
output_format=self._output_format,
)
async for chunk in chunk_generator:
await self.stop_ttfb_metrics()
yield AudioRawFrame(chunk["audio"], chunk["sampling_rate"], 1)
yield AudioRawFrame(chunk["audio"], self._output_format["sample_rate"], 1)
except Exception as e:
logger.error(f"{self} exception: {e}")
logger.exception(f"{self} exception: {e}")

View File

@@ -5,7 +5,6 @@
#
import aiohttp
import asyncio
import time
from typing import AsyncGenerator
@@ -21,17 +20,24 @@ from pipecat.frames.frames import (
SystemFrame,
TranscriptionFrame)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_services import AIService, TTSService
from deepgram import (
DeepgramClient,
DeepgramClientOptions,
LiveTranscriptionEvents,
LiveOptions,
)
from pipecat.services.ai_services import AsyncAIService, TTSService
from loguru import logger
# See .env.example for Deepgram configuration needed
try:
from deepgram import (
DeepgramClient,
DeepgramClientOptions,
LiveTranscriptionEvents,
LiveOptions,
)
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
"In order to use Deepgram, you need to `pip install pipecat-ai[deepgram]`. Also, set `DEEPGRAM_API_KEY` environment variable.")
raise Exception(f"Missing module: {e}")
class DeepgramTTSService(TTSService):
@@ -83,11 +89,12 @@ class DeepgramTTSService(TTSService):
frame = AudioRawFrame(audio=data, sample_rate=16000, num_channels=1)
yield frame
except Exception as e:
logger.error(f"{self} exception: {e}")
logger.exception(f"{self} exception: {e}")
class DeepgramSTTService(AIService):
class DeepgramSTTService(AsyncAIService):
def __init__(self,
*,
api_key: str,
url: str = "",
live_options: LiveOptions = LiveOptions(
@@ -109,8 +116,6 @@ class DeepgramSTTService(AIService):
self._connection = self._client.listen.asynclive.v("1")
self._connection.on(LiveTranscriptionEvents.Transcript, self._on_message)
self._create_push_task()
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
@@ -119,7 +124,7 @@ class DeepgramSTTService(AIService):
elif isinstance(frame, AudioRawFrame):
await self._connection.send(frame.audio)
else:
await self._push_queue.put((frame, direction))
await self.queue_frame(frame, direction)
async def start(self, frame: StartFrame):
if await self._connection.start(self._live_options):
@@ -129,27 +134,9 @@ class DeepgramSTTService(AIService):
async def stop(self, frame: EndFrame):
await self._connection.finish()
await self._push_queue.put((frame, FrameDirection.DOWNSTREAM))
await self._push_frame_task
async def cancel(self, frame: CancelFrame):
await self._connection.finish()
self._push_frame_task.cancel()
await self._push_frame_task
def _create_push_task(self):
self._push_queue = asyncio.Queue()
self._push_frame_task = self.get_event_loop().create_task(self._push_frame_task_handler())
async def _push_frame_task_handler(self):
running = True
while running:
try:
(frame, direction) = await self._push_queue.get()
await self.push_frame(frame, direction)
running = not isinstance(frame, EndFrame)
except asyncio.CancelledError:
break
async def _on_message(self, *args, **kwargs):
result = kwargs["result"]
@@ -157,6 +144,6 @@ class DeepgramSTTService(AIService):
transcript = result.channel.alternatives[0].transcript
if len(transcript) > 0:
if is_final:
await self._push_queue.put((TranscriptionFrame(transcript, "", int(time.time_ns() / 1000000)), FrameDirection.DOWNSTREAM))
await self.queue_frame(TranscriptionFrame(transcript, "", int(time.time_ns() / 1000000)))
else:
await self._push_queue.put((InterimTranscriptionFrame(transcript, "", int(time.time_ns() / 1000000)), FrameDirection.DOWNSTREAM))
await self.queue_frame(InterimTranscriptionFrame(transcript, "", int(time.time_ns() / 1000000)))

View File

@@ -56,7 +56,7 @@ class FalImageGenService(ImageGenService):
response = await fal_client.run_async(
self._model,
arguments={"prompt": prompt, **self._params.model_dump()}
arguments={"prompt": prompt, **self._params.model_dump(exclude_none=True)}
)
image_url = response["images"][0]["url"] if response else None

View File

@@ -19,6 +19,7 @@ except ModuleNotFoundError as e:
class FireworksLLMService(BaseOpenAILLMService):
def __init__(self,
*,
model: str = "accounts/fireworks/models/firefunction-v1",
base_url: str = "https://api.fireworks.ai/inference/v1"):
super().__init__(model, base_url)

View File

@@ -0,0 +1,115 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import base64
import json
import time
from typing import Optional
from pydantic.main import BaseModel
from pipecat.frames.frames import (
AudioRawFrame,
CancelFrame,
EndFrame,
Frame,
InterimTranscriptionFrame,
StartFrame,
SystemFrame,
TranscriptionFrame)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_services import AsyncAIService
from loguru import logger
# See .env.example for Gladia configuration needed
try:
import websockets
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
"In order to use Gladia, you need to `pip install pipecat-ai[gladia]`. Also, set `GLADIA_API_KEY` environment variable.")
raise Exception(f"Missing module: {e}")
class GladiaSTTService(AsyncAIService):
class InputParams(BaseModel):
sample_rate: Optional[int] = 16000
language: Optional[str] = "english"
transcription_hint: Optional[str] = None
endpointing: Optional[int] = 200
prosody: Optional[bool] = None
def __init__(self,
*,
api_key: str,
url: str = "wss://api.gladia.io/audio/text/audio-transcription",
confidence: float = 0.5,
params: InputParams = InputParams(),
**kwargs):
super().__init__(**kwargs)
self._api_key = api_key
self._url = url
self._params = params
self._confidence = confidence
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, SystemFrame):
await self.push_frame(frame, direction)
elif isinstance(frame, AudioRawFrame):
await self._send_audio(frame)
else:
await self.queue_frame(frame, direction)
async def start(self, frame: StartFrame):
self._websocket = await websockets.connect(self._url)
self._receive_task = self.get_event_loop().create_task(self._receive_task_handler())
await self._setup_gladia()
async def stop(self, frame: EndFrame):
await self._websocket.close()
async def cancel(self, frame: CancelFrame):
await self._websocket.close()
async def _setup_gladia(self):
configuration = {
"x_gladia_key": self._api_key,
"encoding": "WAV/PCM",
"model_type": "fast",
"language_behaviour": "manual",
**self._params.model_dump(exclude_none=True)
}
await self._websocket.send(json.dumps(configuration))
async def _send_audio(self, frame: AudioRawFrame):
message = {
'frames': base64.b64encode(frame.audio).decode("utf-8")
}
await self._websocket.send(json.dumps(message))
async def _receive_task_handler(self):
async for message in self._websocket:
utterance = json.loads(message)
if not utterance:
continue
if "error" in utterance:
message = utterance["message"]
logger.error(f"Gladia error: {message}")
elif "confidence" in utterance:
type = utterance["type"]
confidence = utterance["confidence"]
transcript = utterance["transcription"]
if confidence >= self._confidence:
if type == "final":
await self.queue_frame(TranscriptionFrame(transcript, "", int(time.time_ns() / 1000000)))
else:
await self.queue_frame(InterimTranscriptionFrame(transcript, "", int(time.time_ns() / 1000000)))

View File

@@ -42,7 +42,7 @@ class GoogleLLMService(LLMService):
franca for all LLM services, so that it is easy to switch between different LLMs.
"""
def __init__(self, api_key: str, model: str = "gemini-1.5-flash-latest", **kwargs):
def __init__(self, *, api_key: str, model: str = "gemini-1.5-flash-latest", **kwargs):
super().__init__(**kwargs)
gai.configure(api_key=api_key)
self._client = gai.GenerativeModel(model)
@@ -104,10 +104,10 @@ class GoogleLLMService(LLMService):
logger.debug(
f"LLM refused to generate content for safety reasons - {messages}.")
else:
logger.error(f"{self} error: {e}")
logger.exception(f"{self} error: {e}")
except Exception as e:
logger.error(f"{self} exception: {e}")
logger.exception(f"{self} exception: {e}")
finally:
await self.push_frame(LLMFullResponseEndFrame())

View File

@@ -46,6 +46,7 @@ def detect_device():
class MoondreamService(VisionService):
def __init__(
self,
*,
model="vikhyatk/moondream2",
revision="2024-04-02",
use_cpu=False

View File

@@ -9,5 +9,5 @@ from pipecat.services.openai import BaseOpenAILLMService
class OLLamaLLMService(BaseOpenAILLMService):
def __init__(self, model: str = "llama2", base_url: str = "http://localhost:11434/v1"):
def __init__(self, *, model: str = "llama2", base_url: str = "http://localhost:11434/v1"):
super().__init__(model=model, base_url=base_url, api_key="ollama")

View File

@@ -9,7 +9,7 @@ import base64
import io
import json
from typing import Any, AsyncGenerator, List, Literal
from typing import AsyncGenerator, List, Literal
from loguru import logger
from PIL import Image
@@ -53,7 +53,7 @@ except ModuleNotFoundError as e:
raise Exception(f"Missing module: {e}")
class OpenAIUnhandledFunctionException(BaseException):
class OpenAIUnhandledFunctionException(Exception):
pass
@@ -67,7 +67,7 @@ class BaseOpenAILLMService(LLMService):
calls from the LLM.
"""
def __init__(self, model: str, api_key=None, base_url=None, **kwargs):
def __init__(self, *, model: str, api_key=None, base_url=None, **kwargs):
super().__init__(**kwargs)
self._model: str = model
self._client = self.create_client(api_key=api_key, base_url=base_url, **kwargs)
@@ -109,10 +109,7 @@ class BaseOpenAILLMService(LLMService):
del message["data"]
del message["mime_type"]
try:
chunks = await self.get_chat_completions(context, messages)
except Exception as e:
logger.error(f"{self} exception: {e}")
chunks = await self.get_chat_completions(context, messages)
return chunks
@@ -214,7 +211,7 @@ class BaseOpenAILLMService(LLMService):
elif isinstance(result, type(None)):
pass
else:
raise BaseException(f"Unknown return type from function callback: {type(result)}")
raise TypeError(f"Unknown return type from function callback: {type(result)}")
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
@@ -231,14 +228,16 @@ class BaseOpenAILLMService(LLMService):
if context:
await self.push_frame(LLMFullResponseStartFrame())
await self.start_processing_metrics()
await self._process_context(context)
await self.stop_processing_metrics()
await self.push_frame(LLMFullResponseEndFrame())
class OpenAILLMService(BaseOpenAILLMService):
def __init__(self, model="gpt-4o", **kwargs):
super().__init__(model, **kwargs)
def __init__(self, *, model: str = "gpt-4o", **kwargs):
super().__init__(model=model, **kwargs)
class OpenAIImageGenService(ImageGenService):
@@ -334,4 +333,4 @@ class OpenAITTSService(TTSService):
frame = AudioRawFrame(chunk, 24_000, 1)
yield frame
except BadRequestError as e:
logger.error(f"{self} error generating TTS: {e}")
logger.exception(f"{self} error generating TTS: {e}")

View File

@@ -25,6 +25,7 @@ class OpenPipeLLMService(BaseOpenAILLMService):
def __init__(
self,
*,
model: str = "gpt-4o",
api_key: str | None = None,
base_url: str | None = None,
@@ -33,9 +34,9 @@ class OpenPipeLLMService(BaseOpenAILLMService):
tags: Dict[str, str] | None = None,
**kwargs):
super().__init__(
model,
api_key,
base_url,
model=model,
api_key=api_key,
base_url=base_url,
openpipe_api_key=openpipe_api_key,
openpipe_base_url=openpipe_base_url,
**kwargs)

View File

@@ -80,4 +80,4 @@ class PlayHTTTSService(TTSService):
frame = AudioRawFrame(chunk, 16000, 1)
yield frame
except Exception as e:
logger.error(f"{self} error generating TTS: {e}")
logger.exception(f"{self} error generating TTS: {e}")

View File

@@ -42,7 +42,8 @@ class WhisperSTTService(STTService):
"""Class to transcribe audio with a locally-downloaded Whisper model"""
def __init__(self,
model: Model = Model.DISTIL_MEDIUM_EN,
*,
model: str | Model = Model.DISTIL_MEDIUM_EN,
device: str = "auto",
compute_type: str = "default",
no_speech_prob: float = 0.4,
@@ -51,7 +52,7 @@ class WhisperSTTService(STTService):
super().__init__(**kwargs)
self._device: str = device
self._compute_type = compute_type
self._model_name: Model = model
self._model_name: str | Model = model
self._no_speech_prob = no_speech_prob
self._model: WhisperModel | None = None
self._load()
@@ -64,7 +65,7 @@ class WhisperSTTService(STTService):
this model is being run, it will take time to download."""
logger.debug("Loading Whisper model...")
self._model = WhisperModel(
self._model_name.value,
self._model_name.value if isinstance(self._model_name, Enum) else self._model_name,
device=self._device,
compute_type=self._compute_type)
logger.debug("Loaded Whisper model")

View File

@@ -0,0 +1,112 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import aiohttp
from typing import AsyncGenerator
from pipecat.frames.frames import AudioRawFrame, ErrorFrame, Frame
from pipecat.services.ai_services import TTSService
from loguru import logger
import requests
import numpy as np
try:
import resampy
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use XTTS, you need to `pip install pipecat-ai[xtts]`.")
raise Exception(f"Missing module: {e}")
# The server below can connect to XTTS through a local running docker
#
# Docker command: $ docker run --gpus=all -e COQUI_TOS_AGREED=1 --rm -p 8000:80 ghcr.io/coqui-ai/xtts-streaming-server:latest-cuda121
#
# You can find more information on the official repo:
# https://github.com/coqui-ai/xtts-streaming-server
class XTTSService(TTSService):
def __init__(
self,
*,
aiohttp_session: aiohttp.ClientSession,
voice_id: str,
language: str,
base_url: str,
**kwargs):
super().__init__(**kwargs)
self._voice_id = voice_id
self._language = language
self._base_url = base_url
self._aiohttp_session = aiohttp_session
self._studio_speakers = requests.get(self._base_url + "/studio_speakers").json()
def can_generate_metrics(self) -> bool:
return True
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"Generating TTS: [{text}]")
embeddings = self._studio_speakers[self._voice_id]
url = self._base_url + "/tts_stream"
payload = {
"text": text.replace('.', '').replace('*', ''),
"language": self._language,
"speaker_embedding": embeddings["speaker_embedding"],
"gpt_cond_latent": embeddings["gpt_cond_latent"],
"add_wav_header": False,
"stream_chunk_size": 20,
}
await self.start_ttfb_metrics()
async with self._aiohttp_session.post(url, json=payload) as r:
if r.status != 200:
text = await r.text()
logger.error(f"{self} error getting audio (status: {r.status}, error: {text})")
yield ErrorFrame(f"Error getting audio (status: {r.status}, error: {text})")
return
buffer = bytearray()
async for chunk in r.content.iter_chunked(1024):
if len(chunk) > 0:
await self.stop_ttfb_metrics()
# Append new chunk to the buffer
buffer.extend(chunk)
# Check if buffer has enough data for processing
while len(buffer) >= 48000: # Assuming at least 0.5 seconds of audio data at 24000 Hz
# Process the buffer up to a safe size for resampling
process_data = buffer[:48000]
# Remove processed data from buffer
buffer = buffer[48000:]
# Convert the byte data to numpy array for resampling
audio_np = np.frombuffer(process_data, dtype=np.int16)
# Resample the audio from 24000 Hz to 16000 Hz
resampled_audio = resampy.resample(audio_np, 24000, 16000)
# Convert the numpy array back to bytes
resampled_audio_bytes = resampled_audio.astype(np.int16).tobytes()
# Create the frame with the resampled audio
frame = AudioRawFrame(resampled_audio_bytes, 16000, 1)
yield frame
# Process any remaining data in the buffer
if len(buffer) > 0:
audio_np = np.frombuffer(buffer, dtype=np.int16)
resampled_audio = resampy.resample(audio_np, 24000, 16000)
resampled_audio_bytes = resampled_audio.astype(np.int16).tobytes()
frame = AudioRawFrame(resampled_audio_bytes, 16000, 1)
yield frame

View File

@@ -55,7 +55,7 @@ class BaseInputTransport(FrameProcessor):
async def push_audio_frame(self, frame: AudioRawFrame):
if self._params.audio_in_enabled or self._params.vad_enabled:
self._audio_in_queue.put_nowait(frame)
await self._audio_in_queue.put(frame)
#
# Frame processor
@@ -113,10 +113,15 @@ class BaseInputTransport(FrameProcessor):
# Make sure we notify about interruptions quickly out-of-band
if isinstance(frame, UserStartedSpeakingFrame):
logger.debug("User started speaking")
# Cancel the task. This will stop pushing frames downstream.
self._push_frame_task.cancel()
await self._push_frame_task
self._create_push_task()
# Push an out-of-band frame (i.e. not using the ordered push
# frame task) to stop everything, specially at the output
# transport.
await self.push_frame(StartInterruptionFrame())
# Create a new queue and task.
self._create_push_task()
elif isinstance(frame, UserStoppedSpeakingFrame):
logger.debug("User stopped speaking")
await self.push_frame(StopInterruptionFrame())
@@ -168,5 +173,5 @@ class BaseInputTransport(FrameProcessor):
await self._internal_push_frame(frame)
except asyncio.CancelledError:
break
except BaseException as e:
logger.error(f"{self} error reading audio frames: {e}")
except Exception as e:
logger.exception(f"{self} error reading audio frames: {e}")

View File

@@ -14,6 +14,7 @@ from typing import List
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.frames.frames import (
AudioRawFrame,
BotSpeakingFrame,
CancelFrame,
MetricsFrame,
SpriteFrame,
@@ -180,8 +181,8 @@ class BaseOutputTransport(FrameProcessor):
self._sink_queue.task_done()
except asyncio.CancelledError:
break
except BaseException as e:
logger.error(f"{self} error processing sink queue: {e}")
except Exception as e:
logger.exception(f"{self} error processing sink queue: {e}")
#
# Push frames task
@@ -250,7 +251,7 @@ class BaseOutputTransport(FrameProcessor):
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"{self} error writing to camera: {e}")
logger.exception(f"{self} error writing to camera: {e}")
#
# Audio out
@@ -263,4 +264,5 @@ class BaseOutputTransport(FrameProcessor):
if len(buffer) >= self._audio_chunk_size:
await self.write_raw_audio_frames(bytes(buffer[:self._audio_chunk_size]))
buffer = buffer[self._audio_chunk_size:]
await self.push_frame(BotSpeakingFrame(), FrameDirection.UPSTREAM)
return buffer

View File

@@ -82,5 +82,4 @@ class BaseTransport(ABC):
else:
handler(self, *args, **kwargs)
except Exception as e:
logger.error(f"Exception in event handler {event_name}: {e}")
raise e
logger.exception(f"Exception in event handler {event_name}: {e}")

View File

@@ -12,7 +12,6 @@ import wave
from typing import Awaitable, Callable
from pydantic.main import BaseModel
from pipecat.serializers.twilio import TwilioFrameSerializer
from pipecat.frames.frames import AudioRawFrame, StartFrame
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.serializers.base_serializer import FrameSerializer
@@ -35,7 +34,7 @@ except ModuleNotFoundError as e:
class FastAPIWebsocketParams(TransportParams):
add_wav_header: bool = False
audio_frame_size: int = 6400 # 200ms
serializer: FrameSerializer = TwilioFrameSerializer()
serializer: FrameSerializer
class FastAPIWebsocketCallbacks(BaseModel):
@@ -114,7 +113,7 @@ class FastAPIWebsocketOutputTransport(BaseOutputTransport):
frame = wav_frame
payload = self._params.serializer.serialize(frame)
if payload:
if payload and self._websocket.client_state == WebSocketState.CONNECTED:
await self._websocket.send_text(payload)
self._audio_buffer = self._audio_buffer[self._params.audio_frame_size:]
@@ -125,7 +124,7 @@ class FastAPIWebsocketTransport(BaseTransport):
def __init__(
self,
websocket: WebSocket,
params: FastAPIWebsocketParams = FastAPIWebsocketParams(),
params: FastAPIWebsocketParams,
input_name: str | None = None,
output_name: str | None = None,
loop: asyncio.AbstractEventLoop | None = None):

View File

@@ -124,6 +124,9 @@ class WebsocketServerOutputTransport(BaseOutputTransport):
self._websocket = websocket
async def write_raw_audio_frames(self, frames: bytes):
if not self._websocket:
return
self._audio_buffer += frames
while len(self._audio_buffer) >= self._params.audio_frame_size:
frame = AudioRawFrame(
@@ -148,8 +151,8 @@ class WebsocketServerOutputTransport(BaseOutputTransport):
frame = wav_frame
proto = self._params.serializer.serialize(frame)
await self._websocket.send(proto)
if proto:
await self._websocket.send(proto)
self._audio_buffer = self._audio_buffer[self._params.audio_frame_size:]

View File

@@ -9,7 +9,7 @@ import asyncio
import time
from dataclasses import dataclass
from typing import Any, Awaitable, Callable, Mapping
from typing import Any, Awaitable, Callable, Mapping, Optional
from concurrent.futures import ThreadPoolExecutor
from daily import (
@@ -59,8 +59,8 @@ class DailyTransportMessageFrame(TransportMessageFrame):
class WebRTCVADAnalyzer(VADAnalyzer):
def __init__(self, sample_rate=16000, num_channels=1, params: VADParams = VADParams()):
super().__init__(sample_rate, num_channels, params)
def __init__(self, *, sample_rate=16000, num_channels=1, params: VADParams = VADParams()):
super().__init__(sample_rate=sample_rate, num_channels=num_channels, params=params)
self._webrtc_vad = Daily.create_native_vad(
reset_period_ms=VAD_RESET_PERIOD_MS,
@@ -101,7 +101,7 @@ class DailyTranscriptionSettings(BaseModel):
class DailyParams(TransportParams):
api_url: str = "https://api.daily.co/v1"
api_key: str = ""
dialin_settings: DailyDialinSettings | None = None
dialin_settings: Optional[DailyDialinSettings] = None
transcription_enabled: bool = False
transcription_settings: DailyTranscriptionSettings = DailyTranscriptionSettings()
@@ -199,6 +199,9 @@ class DailyTransportClient(EventHandler):
self._callbacks = callbacks
async def send_message(self, frame: DailyTransportMessageFrame):
if not self._client:
return
future = self._loop.create_future()
self._client.send_app_message(
frame.message,
@@ -209,19 +212,18 @@ class DailyTransportClient(EventHandler):
async def read_next_audio_frame(self) -> AudioRawFrame | None:
sample_rate = self._params.audio_in_sample_rate
num_channels = self._params.audio_in_channels
num_frames = int(sample_rate / 100) * 2 # 20ms of audio
if self._other_participant_has_joined:
num_frames = int(sample_rate / 100) * 2 # 20ms of audio
future = self._loop.create_future()
self._speaker.read_frames(num_frames, completion=completion_callback(future))
audio = await future
future = self._loop.create_future()
self._speaker.read_frames(num_frames, completion=completion_callback(future))
audio = await future
if len(audio) > 0:
return AudioRawFrame(audio=audio, sample_rate=sample_rate, num_channels=num_channels)
else:
# If no one has ever joined the meeting `read_frames()` would block,
# instead we just wait a bit. daily-python should probably return
# silence instead.
# If we don't read any audio it could be there's no participant
# connected. daily-python will return immediately if that's the
# case, so let's sleep for a little bit (i.e. busy wait).
await asyncio.sleep(0.01)
return None
@@ -266,7 +268,7 @@ class DailyTransportClient(EventHandler):
logger.info(
f"Enabling transcription with settings {self._params.transcription_settings}")
self._client.start_transcription(
self._params.transcription_settings.model_dump())
self._params.transcription_settings.model_dump(exclude_none=True))
await self._callbacks.on_joined(data["participants"]["local"])
else:
@@ -657,11 +659,11 @@ class DailyOutputTransport(BaseOutputTransport):
await self._client.send_message(frame)
async def send_metrics(self, frame: MetricsFrame):
ttfb = [{"name": n, "time": t} for n, t in frame.ttfb.items()]
message = DailyTransportMessageFrame(message={
"type": "pipecat-metrics",
"metrics": {
"ttfb": ttfb
"ttfb": frame.ttfb or [],
"processing": frame.processing or [],
},
})
await self._client.send_message(message)
@@ -836,8 +838,8 @@ class DailyTransport(BaseTransport):
logger.debug("Event dialin-ready was handled successfully")
except asyncio.TimeoutError:
logger.error(f"Timeout handling dialin-ready event ({url})")
except BaseException as e:
logger.error(f"Error handling dialin-ready event ({url}): {e}")
except Exception as e:
logger.exception(f"Error handling dialin-ready event ({url}): {e}")
async def _on_dialin_ready(self, sip_endpoint):
if self._params.dialin_settings:

View File

@@ -2,7 +2,7 @@ from typing import List
from pipecat.processors.frame_processor import FrameProcessor
class TestException(BaseException):
class TestException(Exception):
pass

View File

@@ -33,14 +33,23 @@ _MODEL_RESET_STATES_TIME = 5.0
class SileroVADAnalyzer(VADAnalyzer):
def __init__(self, sample_rate=16000, params: VADParams = VADParams()):
def __init__(
self,
*,
sample_rate: int = 16000,
version: str = "v5.0",
params: VADParams = VADParams()):
super().__init__(sample_rate=sample_rate, num_channels=1, params=params)
if sample_rate != 16000 and sample_rate != 8000:
raise ValueError("Silero VAD sample rate needs to be 16000 or 8000")
logger.debug("Loading Silero VAD model...")
(self._model, utils) = torch.hub.load(
repo_or_dir="snakers4/silero-vad", model="silero_vad", force_reload=False
)
(self._model, _) = torch.hub.load(repo_or_dir=f"snakers4/silero-vad:{version}",
model="silero_vad",
force_reload=False,
trust_repo=True)
self._last_reset_time = 0
@@ -51,7 +60,7 @@ class SileroVADAnalyzer(VADAnalyzer):
#
def num_frames_required(self) -> int:
return int(self.sample_rate / 100) * 4 # 40ms
return 512 if self.sample_rate == 16000 else 256
def voice_confidence(self, buffer) -> float:
try:
@@ -69,9 +78,9 @@ class SileroVADAnalyzer(VADAnalyzer):
self._last_reset_time = curr_time
return new_confidence
except BaseException as e:
except Exception as e:
# This comes from an empty audio array
logger.error(f"Error analyzing audio with Silero VAD: {e}")
logger.exception(f"Error analyzing audio with Silero VAD: {e}")
return 0
@@ -79,12 +88,15 @@ class SileroVAD(FrameProcessor):
def __init__(
self,
*,
sample_rate: int = 16000,
version: str = "v5.0",
vad_params: VADParams = VADParams(),
audio_passthrough: bool = False):
super().__init__()
self._vad_analyzer = SileroVADAnalyzer(sample_rate=sample_rate, params=vad_params)
self._vad_analyzer = SileroVADAnalyzer(
sample_rate=sample_rate, version=version, params=vad_params)
self._audio_passthrough = audio_passthrough
self._processor_vad_state: VADState = VADState.QUIET

View File

@@ -28,7 +28,7 @@ class VADParams(BaseModel):
class VADAnalyzer:
def __init__(self, sample_rate: int, num_channels: int, params: VADParams):
def __init__(self, *, sample_rate: int, num_channels: int, params: VADParams):
self._sample_rate = sample_rate
self._num_channels = num_channels
self._params = params