Compare commits
125 Commits
khk/vad-ga
...
v0.0.38
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
060a22f395 | ||
|
|
d3e85355f1 | ||
|
|
83e730b768 | ||
|
|
5fcc96446c | ||
|
|
ad88925154 | ||
|
|
0a6ddbf15c | ||
|
|
08e0722d97 | ||
|
|
05d4fba551 | ||
|
|
f41c2b3c9f | ||
|
|
69f64899fe | ||
|
|
33f0865430 | ||
|
|
ad5b9202ab | ||
|
|
1676693091 | ||
|
|
0852b50b8f | ||
|
|
eb998aa502 | ||
|
|
6dab0e9de7 | ||
|
|
95ff1d141c | ||
|
|
87bc8a9da6 | ||
|
|
087fe9a537 | ||
|
|
c1170260b5 | ||
|
|
65cdf50774 | ||
|
|
9233bb490c | ||
|
|
43932220f7 | ||
|
|
cea4d1894e | ||
|
|
80baa0358d | ||
|
|
5d73db53a0 | ||
|
|
302ea90dce | ||
|
|
37b04ed283 | ||
|
|
be6995cfdf | ||
|
|
dfbc11300c | ||
|
|
82d539d174 | ||
|
|
6e00f31014 | ||
|
|
a46ac3cc92 | ||
|
|
6fbf98d8e2 | ||
|
|
f094c42728 | ||
|
|
13827e1282 | ||
|
|
32170b47d9 | ||
|
|
09c05354c2 | ||
|
|
b0b1475563 | ||
|
|
b85dd7283a | ||
|
|
846ae765e5 | ||
|
|
4c629e538e | ||
|
|
f6e22bb3b9 | ||
|
|
46a048d7f6 | ||
|
|
bd9f4eea06 | ||
|
|
0a672e61e2 | ||
|
|
29a8530221 | ||
|
|
3e738642a7 | ||
|
|
f551f55f03 | ||
|
|
9f012c8002 | ||
|
|
0a69a9e5ef | ||
|
|
194790183a | ||
|
|
2227721173 | ||
|
|
77a53da5f5 | ||
|
|
ab63ff275d | ||
|
|
e5363f65f0 | ||
|
|
ffc157de65 | ||
|
|
f9fdadb4c0 | ||
|
|
4efccb79f2 | ||
|
|
337968199a | ||
|
|
37027f68cb | ||
|
|
d1b62c5495 | ||
|
|
355fe01cb7 | ||
|
|
9d050a16c7 | ||
|
|
fa53c67606 | ||
|
|
5006376fe6 | ||
|
|
2204b8e205 | ||
|
|
270007b17c | ||
|
|
568eb2ef4c | ||
|
|
73ca9184a8 | ||
|
|
5e8e11e16e | ||
|
|
029bbc16f2 | ||
|
|
9e3d87e4f6 | ||
|
|
f1410a1127 | ||
|
|
2b980d16c3 | ||
|
|
b2b97aafb8 | ||
|
|
da2082b025 | ||
|
|
327ea9d547 | ||
|
|
b23db4a202 | ||
|
|
d1a36004ab | ||
|
|
6071920c45 | ||
|
|
5f539e1fba | ||
|
|
8e1539c360 | ||
|
|
065cfb2aca | ||
|
|
3147534e86 | ||
|
|
be5603bf16 | ||
|
|
b9b0bcdcbd | ||
|
|
5bcece56f3 | ||
|
|
d67faef88c | ||
|
|
8f6db5e905 | ||
|
|
82e93a0560 | ||
|
|
a9a82c083b | ||
|
|
974d9c33ed | ||
|
|
c1957ab694 | ||
|
|
b20a10a4bc | ||
|
|
be14ce465d | ||
|
|
d1ca0c5614 | ||
|
|
535514f506 | ||
|
|
933b63cf13 | ||
|
|
d7c3e380a5 | ||
|
|
c5298f78cb | ||
|
|
4f8f7b8d1d | ||
|
|
d7d46919ac | ||
|
|
e5d73d2e2e | ||
|
|
b145e8ec90 | ||
|
|
97ff4a1fb8 | ||
|
|
5018a552c1 | ||
|
|
7f9fd9ffce | ||
|
|
ddd0ca6a8f | ||
|
|
06f817c7e3 | ||
|
|
df4c3e56c4 | ||
|
|
9d5c2b9656 | ||
|
|
7ce59c5e2e | ||
|
|
1c9631fc78 | ||
|
|
efbe7297f7 | ||
|
|
1b45946a61 | ||
|
|
cbf5a6362c | ||
|
|
583b96c341 | ||
|
|
fc0920504d | ||
|
|
abd65a93b2 | ||
|
|
c3244fdd7a | ||
|
|
e8f58938b0 | ||
|
|
602b4f34b1 | ||
|
|
0399c84dfa | ||
|
|
fd5d879bf5 |
10
.github/workflows/publish_test.yaml
vendored
10
.github/workflows/publish_test.yaml
vendored
@@ -1,10 +1,6 @@
|
||||
name: publish-test
|
||||
|
||||
on:
|
||||
workflow_dispatch:
|
||||
push:
|
||||
branches:
|
||||
- main
|
||||
on: workflow_dispatch
|
||||
|
||||
jobs:
|
||||
build:
|
||||
@@ -13,10 +9,6 @@ jobs:
|
||||
steps:
|
||||
- name: Checkout repo
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
ref: ${{ github.event.inputs.gitref }}
|
||||
fetch-tags: true
|
||||
fetch-depth: 100
|
||||
- name: Set up Python
|
||||
id: setup_python
|
||||
uses: actions/setup-python@v4
|
||||
|
||||
129
CHANGELOG.md
129
CHANGELOG.md
@@ -5,6 +5,135 @@ All notable changes to **pipecat** will be documented in this file.
|
||||
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
||||
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||
|
||||
## [0.0.38] - 2024-07-23
|
||||
|
||||
### Added
|
||||
|
||||
- Added `force_reload`, `skip_validation` and `trust_repo` to `SileroVAD` and
|
||||
`SileroVADAnalyzer`. This allows caching and various GitHub repo validations.
|
||||
|
||||
- Added `send_initial_empty_metrics` flag to `PipelineParams` to request for
|
||||
initial empty metrics (zero values). True by default.
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fixed initial metrics format. It was using the wrong keys name/time instead of
|
||||
processor/value.
|
||||
|
||||
- STT services should be using ISO 8601 time format for transcription frames.
|
||||
|
||||
- Fixed an issue that would cause Daily transport to show a stop transcription
|
||||
error when actually none occurred.
|
||||
|
||||
## [0.0.37] - 2024-07-22
|
||||
|
||||
### Added
|
||||
|
||||
- Added `RTVIProcessor` which implements the RTVI-AI standard.
|
||||
See https://github.com/rtvi-ai
|
||||
|
||||
- Added `BotInterruptionFrame` which allows interrupting the bot while talking.
|
||||
|
||||
- Added `LLMMessagesAppendFrame` which allows appending messages to the current
|
||||
LLM context.
|
||||
|
||||
- Added `LLMMessagesUpdateFrame` which allows changing the LLM context for the
|
||||
one provided in this new frame.
|
||||
|
||||
- Added `LLMModelUpdateFrame` which allows updating the LLM model.
|
||||
|
||||
- Added `TTSSpeakFrame` which causes the bot say some text. This text will not
|
||||
be part of the LLM context.
|
||||
|
||||
- Added `TTSVoiceUpdateFrame` which allows updating the TTS voice.
|
||||
|
||||
### Removed
|
||||
|
||||
- We remove the `LLMResponseStartFrame` and `LLMResponseEndFrame` frames. These
|
||||
were added in the past to properly handle interruptions for the
|
||||
`LLMAssistantContextAggregator`. But the `LLMContextAggregator` is now based
|
||||
on `LLMResponseAggregator` which handles interruptions properly by just
|
||||
processing the `StartInterruptionFrame`, so there's no need for these extra
|
||||
frames any more.
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fixed an issue with `StatelessTextTransformer` where it was pushing a string
|
||||
instead of a `TextFrame`.
|
||||
|
||||
- `TTSService` end of sentence detection has been improved. It now works with
|
||||
acronyms, numbers, hours and others.
|
||||
|
||||
- Fixed an issue in `TTSService` that would not properly flush the current
|
||||
aggregated sentence if an `LLMFullResponseEndFrame` was found.
|
||||
|
||||
### Performance
|
||||
|
||||
- `CartesiaTTSService` now uses websockets which improves speed. It also
|
||||
leverages the new Cartesia contexts which maintains generated audio prosody
|
||||
when multiple inputs are sent, therefore improving audio quality a lot.
|
||||
|
||||
## [0.0.36] - 2024-07-02
|
||||
|
||||
### Added
|
||||
|
||||
- Added `GladiaSTTService`.
|
||||
See https://docs.gladia.io/chapters/speech-to-text-api/pages/live-speech-recognition
|
||||
|
||||
- Added `XTTSService`. This is a local Text-To-Speech service.
|
||||
See https://github.com/coqui-ai/TTS
|
||||
|
||||
- Added `UserIdleProcessor`. This processor can be used to wait for any
|
||||
interaction with the user. If the user doesn't say anything within a given
|
||||
timeout a provided callback is called.
|
||||
|
||||
- Added `IdleFrameProcessor`. This processor can be used to wait for frames
|
||||
within a given timeout. If no frame is received within the timeout a provided
|
||||
callback is called.
|
||||
|
||||
- Added new frame `BotSpeakingFrame`. This frame will be continuously pushed
|
||||
upstream while the bot is talking.
|
||||
|
||||
- It is now possible to specify a Silero VAD version when using `SileroVADAnalyzer`
|
||||
or `SileroVAD`.
|
||||
|
||||
- Added `AysncFrameProcessor` and `AsyncAIService`. Some services like
|
||||
`DeepgramSTTService` need to process things asynchronously. For example, audio
|
||||
is sent to Deepgram but transcriptions are not returned immediately. In these
|
||||
cases we still require all frames (except system frames) to be pushed
|
||||
downstream from a single task. That's what `AsyncFrameProcessor` is for. It
|
||||
creates a task and all frames should be pushed from that task. So, whenever a
|
||||
new Deepgram transcription is ready that transcription will also be pushed
|
||||
from this internal task.
|
||||
|
||||
- The `MetricsFrame` now includes processing metrics if metrics are enabled. The
|
||||
processing metrics indicate the time a processor needs to generate all its
|
||||
output. Note that not all processors generate these kind of metrics.
|
||||
|
||||
### Changed
|
||||
|
||||
- `WhisperSTTService` model can now also be a string.
|
||||
|
||||
- Added missing * keyword separators in services.
|
||||
|
||||
### Fixed
|
||||
|
||||
- `WebsocketServerTransport` doesn't try to send frames anymore if serializers
|
||||
returns `None`.
|
||||
|
||||
- Fixed an issue where exceptions that occurred inside frame processors were
|
||||
being swallowed and not displayed.
|
||||
|
||||
- Fixed an issue in `FastAPIWebsocketTransport` where it would still try to send
|
||||
data to the websocket after being closed.
|
||||
|
||||
### Other
|
||||
|
||||
- Added Fly.io deployment example in `examples/deployment/flyio-example`.
|
||||
|
||||
- Added new `17-detect-user-idle.py` example that shows how to use the new
|
||||
`UserIdleProcessor`.
|
||||
|
||||
## [0.0.35] - 2024-06-28
|
||||
|
||||
### Changed
|
||||
|
||||
@@ -39,7 +39,7 @@ pip install "pipecat-ai[option,...]"
|
||||
|
||||
Your project may or may not need these, so they're made available as optional requirements. Here is a list:
|
||||
|
||||
- **AI services**: `anthropic`, `azure`, `deepgram`, `google`, `fal`, `moondream`, `openai`, `openpipe`, `playht`, `silero`, `whisper`
|
||||
- **AI services**: `anthropic`, `azure`, `deepgram`, `gladia`, `google`, `fal`, `moondream`, `openai`, `openpipe`, `playht`, `silero`, `whisper`, `xtts`
|
||||
- **Transports**: `local`, `websocket`, `daily`
|
||||
|
||||
## Code examples
|
||||
@@ -70,8 +70,8 @@ async def main():
|
||||
transport = DailyTransport(
|
||||
room_url=...,
|
||||
token=...,
|
||||
"Bot Name",
|
||||
DailyParams(audio_out_enabled=True))
|
||||
bot_name="Bot Name",
|
||||
params=DailyParams(audio_out_enabled=True))
|
||||
|
||||
# Use Eleven Labs for Text-to-Speech
|
||||
tts = ElevenLabsTTSService(
|
||||
@@ -125,7 +125,7 @@ Sign up [here](https://dashboard.daily.co/u/signup) and [create a room](https://
|
||||
|
||||
Voice Activity Detection — very important for knowing when a user has finished speaking to your bot. If you are not using press-to-talk, and want Pipecat to detect when the user has finished talking, VAD is an essential component for a natural feeling conversation.
|
||||
|
||||
Pipecast makes use of WebRTC VAD by default when using a WebRTC transport layer. Optionally, you can use Silero VAD for improved accuracy at the cost of higher CPU usage.
|
||||
Pipecat makes use of WebRTC VAD by default when using a WebRTC transport layer. Optionally, you can use Silero VAD for improved accuracy at the cost of higher CPU usage.
|
||||
|
||||
```shell
|
||||
pip install pipecat-ai[silero]
|
||||
|
||||
@@ -27,6 +27,9 @@ FAL_KEY=...
|
||||
# Fireworks
|
||||
FIREWORKS_API_KEY=...
|
||||
|
||||
# Gladia
|
||||
GLADIA_API_KEY=...
|
||||
|
||||
# PlayHT
|
||||
PLAY_HT_USER_ID=...
|
||||
PLAY_HT_API_KEY=...
|
||||
|
||||
16
examples/deployment/flyio-example/Dockerfile
Normal file
16
examples/deployment/flyio-example/Dockerfile
Normal file
@@ -0,0 +1,16 @@
|
||||
FROM python:3.11-bullseye
|
||||
|
||||
# Open port 7860 for http service
|
||||
ENV FAST_API_PORT=7860
|
||||
EXPOSE 7860
|
||||
|
||||
# Install Python dependencies
|
||||
COPY *.py .
|
||||
COPY ./requirements.txt requirements.txt
|
||||
RUN pip3 install --no-cache-dir --upgrade -r requirements.txt
|
||||
|
||||
# Install models
|
||||
RUN python3 install_deps.py
|
||||
|
||||
# Start the FastAPI server
|
||||
CMD python3 bot_runner.py --port ${FAST_API_PORT}
|
||||
43
examples/deployment/flyio-example/README.md
Normal file
43
examples/deployment/flyio-example/README.md
Normal file
@@ -0,0 +1,43 @@
|
||||
# Fly.io deployment example
|
||||
|
||||
This project modifies the `bot_runner.py` server to launch a new machine for each user session. This is a recommended approach for production vs. running shell processess as your deployment will quickly run out of system resources under load.
|
||||
|
||||
To speed up machine boot times, we also download and cache Silero VAD as part of the Dockerfile (`install_deps.py`). If you are using other custom models, you can add them here too.
|
||||
|
||||
For this example, we are using Daily as a WebRTC transport and provisioning a new room and token for each session. You can use another transport, such as WebSockets, by modifying the `bot.py` and `bot_runner.py` files accordingly.
|
||||
|
||||
## Setting up your fly.io deployment
|
||||
|
||||
### Create your fly.toml file
|
||||
|
||||
You can copy the `example-fly.toml` as a reference. Be sure to change the app name to something unique.
|
||||
|
||||
### Create your .env file
|
||||
|
||||
Copy the base `env.example` to `.env` and enter the necessary API keys.
|
||||
|
||||
`FLY_APP_NAME` should match that in the `fly.toml` file.
|
||||
|
||||
### Launch a new fly.io project
|
||||
|
||||
`fly launch` or `fly launch --org your-org-name`
|
||||
|
||||
### Set the necessary app secrets from your .env
|
||||
|
||||
Note: you can do this manually via the fly.io dashboard under the "secrets" sub-section of your deployment (e.g. "https://fly.io/apps/fly-app-name/secrets") or run the following terminal command:
|
||||
|
||||
`cat .env | tr '\n' ' ' | xargs flyctl secrets set`
|
||||
|
||||
### Deploy your machine
|
||||
|
||||
`fly deploy`
|
||||
|
||||
|
||||
## Connecting to your bot
|
||||
|
||||
Send a post request to your running fly.io instance:
|
||||
|
||||
`curl --location --request POST 'https://YOUR_FLY_APP_NAME/start_bot'`
|
||||
|
||||
This request will wait until the machine enters into a `starting` state, before returning the a room URL and token to join.
|
||||
|
||||
0
examples/deployment/flyio-example/__init__.py
Normal file
0
examples/deployment/flyio-example/__init__.py
Normal file
103
examples/deployment/flyio-example/bot.py
Normal file
103
examples/deployment/flyio-example/bot.py
Normal file
@@ -0,0 +1,103 @@
|
||||
import asyncio
|
||||
import aiohttp
|
||||
import os
|
||||
import sys
|
||||
import argparse
|
||||
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.llm_response import LLMAssistantResponseAggregator, LLMUserResponseAggregator
|
||||
from pipecat.frames.frames import LLMMessagesFrame, EndFrame
|
||||
from pipecat.services.openai import OpenAILLMService
|
||||
from pipecat.services.elevenlabs import ElevenLabsTTSService
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
from pipecat.vad.silero import SileroVADAnalyzer
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from dotenv import load_dotenv
|
||||
load_dotenv(override=True)
|
||||
|
||||
logger.remove(0)
|
||||
logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
daily_api_key = os.getenv("DAILY_API_KEY", "")
|
||||
daily_api_url = os.getenv("DAILY_API_URL", "https://api.daily.co/v1")
|
||||
|
||||
|
||||
async def main(room_url: str, token: str):
|
||||
async with aiohttp.ClientSession() as session:
|
||||
transport = DailyTransport(
|
||||
room_url,
|
||||
token,
|
||||
"Chatbot",
|
||||
DailyParams(
|
||||
api_url=daily_api_url,
|
||||
api_key=daily_api_key,
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
camera_out_enabled=False,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
transcription_enabled=True,
|
||||
)
|
||||
)
|
||||
|
||||
tts = ElevenLabsTTSService(
|
||||
aiohttp_session=session,
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY", ""),
|
||||
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
model="gpt-4o")
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are Chatbot, a friendly, helpful robot. Your output will be converted to audio so don't include special characters other than '!' or '?' in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by saying hello.",
|
||||
},
|
||||
]
|
||||
|
||||
tma_in = LLMUserResponseAggregator(messages)
|
||||
tma_out = LLMAssistantResponseAggregator(messages)
|
||||
|
||||
pipeline = Pipeline([
|
||||
transport.input(),
|
||||
tma_in,
|
||||
llm,
|
||||
tts,
|
||||
transport.output(),
|
||||
tma_out,
|
||||
])
|
||||
|
||||
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
@transport.event_handler("on_participant_left")
|
||||
async def on_participant_left(transport, participant, reason):
|
||||
await task.queue_frame(EndFrame())
|
||||
|
||||
@transport.event_handler("on_call_state_updated")
|
||||
async def on_call_state_updated(transport, state):
|
||||
if state == "left":
|
||||
await task.queue_frame(EndFrame())
|
||||
|
||||
runner = PipelineRunner()
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="Pipecat Bot")
|
||||
parser.add_argument("-u", type=str, help="Room URL")
|
||||
parser.add_argument("-t", type=str, help="Token")
|
||||
config = parser.parse_args()
|
||||
|
||||
asyncio.run(main(config.u, config.t))
|
||||
199
examples/deployment/flyio-example/bot_runner.py
Normal file
199
examples/deployment/flyio-example/bot_runner.py
Normal file
@@ -0,0 +1,199 @@
|
||||
import os
|
||||
import argparse
|
||||
import subprocess
|
||||
import requests
|
||||
|
||||
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomObject, DailyRoomProperties, DailyRoomParams
|
||||
|
||||
from fastapi import FastAPI, Request, HTTPException
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from fastapi.responses import JSONResponse
|
||||
|
||||
from dotenv import load_dotenv
|
||||
load_dotenv(override=True)
|
||||
|
||||
|
||||
# ------------ Configuration ------------ #
|
||||
|
||||
MAX_SESSION_TIME = 5 * 60 # 5 minutes
|
||||
REQUIRED_ENV_VARS = [
|
||||
'DAILY_API_KEY',
|
||||
'OPENAI_API_KEY',
|
||||
'ELEVENLABS_API_KEY',
|
||||
'ELEVENLABS_VOICE_ID',
|
||||
'FLY_API_KEY',
|
||||
'FLY_APP_NAME',]
|
||||
|
||||
FLY_API_HOST = os.getenv("FLY_API_HOST", "https://api.machines.dev/v1")
|
||||
FLY_APP_NAME = os.getenv("FLY_APP_NAME", "pipecat-fly-example")
|
||||
FLY_API_KEY = os.getenv("FLY_API_KEY", "")
|
||||
FLY_HEADERS = {
|
||||
'Authorization': f"Bearer {FLY_API_KEY}",
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
|
||||
daily_rest_helper = DailyRESTHelper(
|
||||
os.getenv("DAILY_API_KEY", ""),
|
||||
os.getenv("DAILY_API_URL", 'https://api.daily.co/v1'))
|
||||
|
||||
|
||||
# ----------------- API ----------------- #
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
app.add_middleware(
|
||||
CORSMiddleware,
|
||||
allow_origins=["*"],
|
||||
allow_credentials=True,
|
||||
allow_methods=["*"],
|
||||
allow_headers=["*"]
|
||||
)
|
||||
|
||||
# ----------------- Main ----------------- #
|
||||
|
||||
|
||||
def spawn_fly_machine(room_url: str, token: str):
|
||||
# Use the same image as the bot runner
|
||||
res = requests.get(f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines", headers=FLY_HEADERS)
|
||||
if res.status_code != 200:
|
||||
raise Exception(f"Unable to get machine info from Fly: {res.text}")
|
||||
image = res.json()[0]['config']['image']
|
||||
|
||||
# Machine configuration
|
||||
cmd = f"python3 bot.py -u {room_url} -t {token}"
|
||||
cmd = cmd.split()
|
||||
worker_props = {
|
||||
"config": {
|
||||
"image": image,
|
||||
"auto_destroy": True,
|
||||
"init": {
|
||||
"cmd": cmd
|
||||
},
|
||||
"restart": {
|
||||
"policy": "no"
|
||||
},
|
||||
"guest": {
|
||||
"cpu_kind": "shared",
|
||||
"cpus": 1,
|
||||
"memory_mb": 1024
|
||||
}
|
||||
},
|
||||
|
||||
}
|
||||
|
||||
# Spawn a new machine instance
|
||||
res = requests.post(
|
||||
f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines",
|
||||
headers=FLY_HEADERS,
|
||||
json=worker_props)
|
||||
|
||||
if res.status_code != 200:
|
||||
raise Exception(f"Problem starting a bot worker: {res.text}")
|
||||
|
||||
# Wait for the machine to enter the started state
|
||||
vm_id = res.json()['id']
|
||||
|
||||
res = requests.get(
|
||||
f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines/{vm_id}/wait?state=started",
|
||||
headers=FLY_HEADERS)
|
||||
|
||||
if res.status_code != 200:
|
||||
raise Exception(f"Bot was unable to enter started state: {res.text}")
|
||||
|
||||
print(f"Machine joined room: {room_url}")
|
||||
|
||||
|
||||
@app.post("/start_bot")
|
||||
async def start_bot(request: Request) -> JSONResponse:
|
||||
try:
|
||||
data = await request.json()
|
||||
# Is this a webhook creation request?
|
||||
if "test" in data:
|
||||
return JSONResponse({"test": True})
|
||||
except Exception as e:
|
||||
pass
|
||||
|
||||
# Use specified room URL, or create a new one if not specified
|
||||
room_url = os.getenv("DAILY_SAMPLE_ROOM_URL", "")
|
||||
|
||||
if not room_url:
|
||||
params = DailyRoomParams(
|
||||
properties=DailyRoomProperties()
|
||||
)
|
||||
try:
|
||||
room: DailyRoomObject = daily_rest_helper.create_room(params=params)
|
||||
except Exception as e:
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail=f"Unable to provision room {e}")
|
||||
else:
|
||||
# Check passed room URL exists, we should assume that it already has a sip set up
|
||||
try:
|
||||
room: DailyRoomObject = daily_rest_helper.get_room_from_url(room_url)
|
||||
except Exception:
|
||||
raise HTTPException(
|
||||
status_code=500, detail=f"Room not found: {room_url}")
|
||||
|
||||
# Give the agent a token to join the session
|
||||
token = daily_rest_helper.get_token(room.url, MAX_SESSION_TIME)
|
||||
|
||||
if not room or not token:
|
||||
raise HTTPException(
|
||||
status_code=500, detail=f"Failed to get token for room: {room_url}")
|
||||
|
||||
# Launch a new fly.io machine, or run as a shell process (not recommended)
|
||||
run_as_process = os.getenv("RUN_AS_PROCESS", False)
|
||||
|
||||
if run_as_process:
|
||||
try:
|
||||
subprocess.Popen(
|
||||
[f"python3 -m bot -u {room.url} -t {token}"],
|
||||
shell=True,
|
||||
bufsize=1,
|
||||
cwd=os.path.dirname(os.path.abspath(__file__)))
|
||||
except Exception as e:
|
||||
raise HTTPException(
|
||||
status_code=500, detail=f"Failed to start subprocess: {e}")
|
||||
else:
|
||||
try:
|
||||
spawn_fly_machine(room.url, token)
|
||||
except Exception as e:
|
||||
raise HTTPException(
|
||||
status_code=500, detail=f"Failed to spawn VM: {e}")
|
||||
|
||||
# Grab a token for the user to join with
|
||||
user_token = daily_rest_helper.get_token(room.url, MAX_SESSION_TIME)
|
||||
|
||||
return JSONResponse({
|
||||
"room_url": room.url,
|
||||
"token": user_token,
|
||||
})
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Check environment variables
|
||||
for env_var in REQUIRED_ENV_VARS:
|
||||
if env_var not in os.environ:
|
||||
raise Exception(f"Missing environment variable: {env_var}.")
|
||||
|
||||
parser = argparse.ArgumentParser(description="Pipecat Bot Runner")
|
||||
parser.add_argument("--host", type=str,
|
||||
default=os.getenv("HOST", "0.0.0.0"), help="Host address")
|
||||
parser.add_argument("--port", type=int,
|
||||
default=os.getenv("PORT", 7860), help="Port number")
|
||||
parser.add_argument("--reload", action="store_true",
|
||||
default=False, help="Reload code on change")
|
||||
|
||||
config = parser.parse_args()
|
||||
|
||||
try:
|
||||
import uvicorn
|
||||
|
||||
uvicorn.run(
|
||||
"bot_runner:app",
|
||||
host=config.host,
|
||||
port=config.port,
|
||||
reload=config.reload
|
||||
)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("Pipecat runner shutting down...")
|
||||
8
examples/deployment/flyio-example/env.example
Normal file
8
examples/deployment/flyio-example/env.example
Normal file
@@ -0,0 +1,8 @@
|
||||
DAILY_API_KEY=
|
||||
DAILY_SAMPLE_ROOM_URL= # Enter a Daily room URL to use a set room URL each time (useful for local testing)
|
||||
OPENAI_API_KEY=
|
||||
ELEVENLABS_API_KEY=
|
||||
ELEVENLABS_VOICE_ID=
|
||||
FLY_API_KEY=
|
||||
FLY_APP_NAME=
|
||||
RUN_AS_PROCESS= # Spawn fly.io machine for each session or run as local process
|
||||
25
examples/deployment/flyio-example/example-fly.toml
Normal file
25
examples/deployment/flyio-example/example-fly.toml
Normal file
@@ -0,0 +1,25 @@
|
||||
# fly.toml app configuration file generated for pipecat-fly-example on 2024-07-01T15:04:53+01:00
|
||||
#
|
||||
# See https://fly.io/docs/reference/configuration/ for information about how to use this file.
|
||||
#
|
||||
|
||||
app = 'pipecat-fly-example'
|
||||
primary_region = 'sjc'
|
||||
|
||||
[build]
|
||||
|
||||
[env]
|
||||
FLY_APP_NAME = 'pipecat-fly-example'
|
||||
|
||||
[http_service]
|
||||
internal_port = 7860
|
||||
force_https = true
|
||||
auto_stop_machines = true
|
||||
auto_start_machines = true
|
||||
min_machines_running = 0
|
||||
processes = ['app']
|
||||
|
||||
[[vm]]
|
||||
memory = 512
|
||||
cpu_kind = 'shared'
|
||||
cpus = 1
|
||||
4
examples/deployment/flyio-example/install_deps.py
Normal file
4
examples/deployment/flyio-example/install_deps.py
Normal file
@@ -0,0 +1,4 @@
|
||||
import torch
|
||||
|
||||
# Download (cache) the Silero VAD model
|
||||
torch.hub.load(repo_or_dir='snakers4/silero-vad', model='silero_vad', force_reload=True)
|
||||
6
examples/deployment/flyio-example/requirements.txt
Normal file
6
examples/deployment/flyio-example/requirements.txt
Normal file
@@ -0,0 +1,6 @@
|
||||
pipecat-ai[daily,openai,silero]
|
||||
fastapi
|
||||
uvicorn
|
||||
requests
|
||||
python-dotenv
|
||||
loguru
|
||||
@@ -67,11 +67,12 @@ async def main(room_url: str, token):
|
||||
"Respond bot",
|
||||
DailyParams(
|
||||
audio_out_enabled=True,
|
||||
camera_out_enabled=True,
|
||||
camera_out_width=1024,
|
||||
camera_out_height=1024,
|
||||
transcription_enabled=True,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer()
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
)
|
||||
)
|
||||
|
||||
@@ -116,7 +117,7 @@ async def main(room_url: str, token):
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
participant_name = participant["info"]["userName"] or ''
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await task.queue_frames([TextFrame(f"Hi, this is {participant_name}.")])
|
||||
await task.queue_frames([TextFrame(f"Hi there {participant_name}!")])
|
||||
|
||||
runner = PipelineRunner()
|
||||
|
||||
|
||||
@@ -37,6 +37,7 @@ async def main(room_url: str, token):
|
||||
token,
|
||||
"Respond bot",
|
||||
DailyParams(
|
||||
audio_out_sample_rate=44100,
|
||||
audio_out_enabled=True,
|
||||
transcription_enabled=True,
|
||||
vad_enabled=True,
|
||||
@@ -47,6 +48,7 @@ async def main(room_url: str, token):
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="a0e99841-438c-4a64-b679-ae501e7d6091", # Barbershop Man
|
||||
sample_rate=44100,
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(
|
||||
@@ -68,11 +70,11 @@ async def main(room_url: str, token):
|
||||
tma_in, # User responses
|
||||
llm, # LLM
|
||||
tts, # TTS
|
||||
tma_out, # Goes before the transport because cartesia has word-level timestamps!
|
||||
transport.output(), # Transport bot output
|
||||
tma_out # Assistant spoken responses
|
||||
])
|
||||
|
||||
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
|
||||
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True, enable_metrics=True))
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
|
||||
96
examples/foundational/07i-interruptible-xtts.py
Normal file
96
examples/foundational/07i-interruptible-xtts.py
Normal file
@@ -0,0 +1,96 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import aiohttp
|
||||
import os
|
||||
import sys
|
||||
|
||||
from pipecat.frames.frames import LLMMessagesFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.llm_response import (
|
||||
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
|
||||
from pipecat.services.deepgram import DeepgramSTTService, DeepgramTTSService
|
||||
from pipecat.services.openai import OpenAILLMService
|
||||
from pipecat.services.xtts import XTTSService
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
from pipecat.vad.silero import SileroVADAnalyzer
|
||||
|
||||
from runner import configure
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from dotenv import load_dotenv
|
||||
load_dotenv(override=True)
|
||||
|
||||
logger.remove(0)
|
||||
logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
|
||||
async def main(room_url: str, token):
|
||||
async with aiohttp.ClientSession() as session:
|
||||
transport = DailyTransport(
|
||||
room_url,
|
||||
token,
|
||||
"Respond bot",
|
||||
DailyParams(
|
||||
audio_out_enabled=True,
|
||||
transcription_enabled=True,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
)
|
||||
)
|
||||
|
||||
tts = XTTSService(
|
||||
aiohttp_session=session,
|
||||
voice_id="Claribel Dervla",
|
||||
language="en",
|
||||
base_url="http://localhost:8000"
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
model="gpt-4o")
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
|
||||
},
|
||||
]
|
||||
|
||||
tma_in = LLMUserResponseAggregator(messages)
|
||||
tma_out = LLMAssistantResponseAggregator(messages)
|
||||
|
||||
pipeline = Pipeline([
|
||||
transport.input(), # Transport user input
|
||||
tma_in, # User responses
|
||||
llm, # LLM
|
||||
tts, # TTS
|
||||
transport.output(), # Transport bot output
|
||||
tma_out # Assistant spoken responses
|
||||
])
|
||||
|
||||
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
messages.append(
|
||||
{"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
runner = PipelineRunner()
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
(url, token) = configure()
|
||||
asyncio.run(main(url, token))
|
||||
101
examples/foundational/07j-interruptible-gladia.py
Normal file
101
examples/foundational/07j-interruptible-gladia.py
Normal file
@@ -0,0 +1,101 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import aiohttp
|
||||
import os
|
||||
import sys
|
||||
|
||||
from pipecat.frames.frames import LLMMessagesFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.llm_response import (
|
||||
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
|
||||
from pipecat.services.deepgram import DeepgramSTTService, DeepgramTTSService
|
||||
from pipecat.services.gladia import GladiaSTTService
|
||||
from pipecat.services.openai import OpenAILLMService
|
||||
from pipecat.services.xtts import XTTSService
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
from pipecat.vad.silero import SileroVADAnalyzer
|
||||
|
||||
from runner import configure
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from dotenv import load_dotenv
|
||||
load_dotenv(override=True)
|
||||
|
||||
logger.remove(0)
|
||||
logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
|
||||
async def main(room_url: str, token):
|
||||
async with aiohttp.ClientSession() as session:
|
||||
transport = DailyTransport(
|
||||
room_url,
|
||||
token,
|
||||
"Respond bot",
|
||||
DailyParams(
|
||||
audio_out_enabled=True,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
vad_audio_passthrough=True,
|
||||
)
|
||||
)
|
||||
|
||||
stt = GladiaSTTService(
|
||||
api_key=os.getenv("GLADIA_API_KEY"),
|
||||
)
|
||||
|
||||
tts = DeepgramTTSService(
|
||||
aiohttp_session=session,
|
||||
api_key=os.getenv("DEEPGRAM_API_KEY"),
|
||||
voice="aura-helios-en"
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
model="gpt-4o")
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
|
||||
},
|
||||
]
|
||||
|
||||
tma_in = LLMUserResponseAggregator(messages)
|
||||
tma_out = LLMAssistantResponseAggregator(messages)
|
||||
|
||||
pipeline = Pipeline([
|
||||
transport.input(), # Transport user input
|
||||
stt, # STT
|
||||
tma_in, # User responses
|
||||
llm, # LLM
|
||||
tts, # TTS
|
||||
transport.output(), # Transport bot output
|
||||
tma_out # Assistant spoken responses
|
||||
])
|
||||
|
||||
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
messages.append(
|
||||
{"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
runner = PipelineRunner()
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
(url, token) = configure()
|
||||
asyncio.run(main(url, token))
|
||||
108
examples/foundational/17-detect-user-idle.py
Normal file
108
examples/foundational/17-detect-user-idle.py
Normal file
@@ -0,0 +1,108 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import aiohttp
|
||||
import os
|
||||
import sys
|
||||
|
||||
from pipecat.frames.frames import LLMMessagesFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.llm_response import (
|
||||
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.processors.user_idle_processor import UserIdleProcessor
|
||||
from pipecat.services.elevenlabs import ElevenLabsTTSService
|
||||
from pipecat.services.openai import OpenAILLMService
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
from pipecat.vad.silero import SileroVADAnalyzer
|
||||
|
||||
from runner import configure
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from dotenv import load_dotenv
|
||||
load_dotenv(override=True)
|
||||
|
||||
logger.remove(0)
|
||||
logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
|
||||
async def main(room_url: str, token):
|
||||
async with aiohttp.ClientSession() as session:
|
||||
transport = DailyTransport(
|
||||
room_url,
|
||||
token,
|
||||
"Respond bot",
|
||||
DailyParams(
|
||||
audio_out_enabled=True,
|
||||
transcription_enabled=True,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer()
|
||||
)
|
||||
)
|
||||
|
||||
tts = ElevenLabsTTSService(
|
||||
aiohttp_session=session,
|
||||
api_key=os.getenv("ELEVENLABS_API_KEY"),
|
||||
voice_id=os.getenv("ELEVENLABS_VOICE_ID"),
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
model="gpt-4o")
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
|
||||
},
|
||||
]
|
||||
|
||||
tma_in = LLMUserResponseAggregator(messages)
|
||||
tma_out = LLMAssistantResponseAggregator(messages)
|
||||
|
||||
async def user_idle_callback(user_idle: UserIdleProcessor):
|
||||
messages.append(
|
||||
{"role": "system", "content": "Ask the user if they are still there and try to prompt for some input, but be short."})
|
||||
await user_idle.queue_frame(LLMMessagesFrame(messages))
|
||||
|
||||
user_idle = UserIdleProcessor(callback=user_idle_callback, timeout=5.0)
|
||||
|
||||
pipeline = Pipeline([
|
||||
transport.input(), # Transport user input
|
||||
user_idle, # Idle user check-in
|
||||
tma_in, # User responses
|
||||
llm, # LLM
|
||||
tts, # TTS
|
||||
transport.output(), # Transport bot output
|
||||
tma_out # Assistant spoken responses
|
||||
])
|
||||
|
||||
task = PipelineTask(pipeline, PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
report_only_initial_ttfb=True,
|
||||
))
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
messages.append(
|
||||
{"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
runner = PipelineRunner()
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
(url, token) = configure()
|
||||
asyncio.run(main(url, token))
|
||||
@@ -1,4 +1,4 @@
|
||||
FROM python:3.11-bullseye
|
||||
FROM python:3.11-slim-bookworm
|
||||
|
||||
ARG DEBIAN_FRONTEND=noninteractive
|
||||
ARG USE_PERSISTENT_DATA
|
||||
@@ -51,4 +51,4 @@ COPY --chown=user ./frontend/ frontend/
|
||||
RUN cd frontend && npm install && npm run build
|
||||
|
||||
# Start the FastAPI server
|
||||
CMD python3 src/server.py --port ${FAST_API_PORT}
|
||||
CMD python3 src/bot_runner.py --port ${FAST_API_PORT}
|
||||
@@ -48,6 +48,8 @@ pip install -r requirements.txt
|
||||
mv env.example .env
|
||||
```
|
||||
|
||||
When deploying to production, to ensure only this app can spawn a new bot, set your `ENV` to `production`
|
||||
|
||||
**Build the frontend:**
|
||||
|
||||
This project uses a custom frontend, which needs to built. Note: this is done automatically as part of the Docker deployment.
|
||||
@@ -64,11 +66,11 @@ The build UI files can be found in `frontend/out`
|
||||
|
||||
Start the API / bot manager:
|
||||
|
||||
`python src/server.py`
|
||||
`python src/bot_runner.py`
|
||||
|
||||
If you'd like to run a custom domain or port:
|
||||
|
||||
`python src/server.py --host somehost --p 7777`
|
||||
`python src/bot_runner.py --host somehost --p someport`
|
||||
|
||||
➡️ Open the host URL in your browser `http://localhost:7860`
|
||||
|
||||
|
||||
@@ -1,5 +1,9 @@
|
||||
DAILY_API_KEY=7df...
|
||||
ELEVENLABS_API_KEY=aeb...
|
||||
ELEVENLABS_VOICE_ID=7S...
|
||||
FAL_KEY=8c...
|
||||
OPENAI_API_KEY=sk-PL...
|
||||
DAILY_API_KEY=
|
||||
DAILY_SAMPLE_ROOM_URL=
|
||||
ELEVENLABS_API_KEY=
|
||||
ELEVENLABS_VOICE_ID=
|
||||
FAL_KEY=
|
||||
OPENAI_API_KEY=
|
||||
|
||||
ENV= # dev | production
|
||||
RUN_AS_VM= # Set this if you want to run bots on process (not launch a new VM)
|
||||
@@ -27,14 +27,11 @@ export default function Call() {
|
||||
|
||||
// Create a new room for the story session
|
||||
try {
|
||||
const response = await fetch("/create", {
|
||||
const response = await fetch("/start_bot", {
|
||||
method: "POST",
|
||||
headers: {
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
body: JSON.stringify({
|
||||
room_url: process.env.NEXT_PUBLIC_ROOM_URL || null,
|
||||
}),
|
||||
});
|
||||
|
||||
const { room_url, token } = await response.json();
|
||||
@@ -55,21 +52,9 @@ export default function Call() {
|
||||
// Disable local audio, the bot will say hello first
|
||||
daily.setLocalAudio(false);
|
||||
|
||||
// Start the bot
|
||||
const resp = await fetch("/start", {
|
||||
method: "POST",
|
||||
headers: {
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
body: JSON.stringify({
|
||||
room_url,
|
||||
}),
|
||||
});
|
||||
|
||||
setState("started");
|
||||
} catch (error) {
|
||||
setState("error");
|
||||
leave();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -79,7 +64,13 @@ export default function Call() {
|
||||
}
|
||||
|
||||
if (state === "error") {
|
||||
return <div>An Error occured</div>;
|
||||
return (
|
||||
<div className="flex items-center mx-auto">
|
||||
<p className="text-red-500 font-semibold bg-white px-4 py-2 shadow-xl rounded-lg">
|
||||
This demo is currently at capacity. Please try again later.
|
||||
</p>
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
||||
if (state === "started") {
|
||||
|
||||
@@ -108,26 +108,26 @@ export default function DevicePicker({}: Props) {
|
||||
{hasMicError && (
|
||||
<div className="error">
|
||||
{micState === "blocked" ? (
|
||||
<p>
|
||||
<p className="text-red-500">
|
||||
Please check your browser and system permissions. Make sure that
|
||||
this app is allowed to access your microphone.
|
||||
</p>
|
||||
) : micState === "in-use" ? (
|
||||
<p>
|
||||
<p className="text-red-500">
|
||||
Your microphone is being used by another app. Please close any
|
||||
other apps using your microphone and restart this app.
|
||||
</p>
|
||||
) : micState === "not-found" ? (
|
||||
<p>
|
||||
<p className="text-red-500">
|
||||
No microphone seems to be connected. Please connect a microphone.
|
||||
</p>
|
||||
) : micState === "not-supported" ? (
|
||||
<p>
|
||||
<p className="text-red-500">
|
||||
This app is not supported on your device. Please update your
|
||||
software or use a different device.
|
||||
</p>
|
||||
) : (
|
||||
<p>
|
||||
<p className="text-red-500">
|
||||
There seems to be an issue accessing your microphone. Try
|
||||
restarting the app or consult a system administrator.
|
||||
</p>
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import React from "react";
|
||||
import { Button } from "@/components/ui/button";
|
||||
import DevicePicker from "@/components/DevicePicker";
|
||||
import { IconEar, IconLoader2 } from "@tabler/icons-react";
|
||||
import { IconAlertCircle, IconEar, IconLoader2 } from "@tabler/icons-react";
|
||||
|
||||
type SetupProps = {
|
||||
handleStart: () => void;
|
||||
@@ -24,7 +24,6 @@ export const Setup: React.FC<SetupProps> = ({ handleStart }) => {
|
||||
<h1 className="text-4xl font-bold text-pretty tracking-tighter mb-4">
|
||||
Welcome to <span className="text-sky-500">Storytime</span>
|
||||
</h1>
|
||||
|
||||
{state === "intro" ? (
|
||||
<>
|
||||
<p className="text-gray-600 leading-relaxed text-pretty">
|
||||
@@ -38,6 +37,9 @@ export const Setup: React.FC<SetupProps> = ({ handleStart }) => {
|
||||
<IconEar size={24} /> For best results, try in a quiet
|
||||
environment!
|
||||
</p>
|
||||
<p className="flex flex-row gap-2 text-gray-600 font-medium text-red-500">
|
||||
<IconAlertCircle size={24} /> This demo expires after 5 minutes.
|
||||
</p>
|
||||
</>
|
||||
) : (
|
||||
<>
|
||||
@@ -49,7 +51,6 @@ export const Setup: React.FC<SetupProps> = ({ handleStart }) => {
|
||||
<DevicePicker />
|
||||
</>
|
||||
)}
|
||||
|
||||
<hr className="border-gray-150 my-2" />
|
||||
|
||||
<Button
|
||||
|
||||
@@ -1,2 +1 @@
|
||||
NEXT_PUBLIC_ROOM_URL=
|
||||
SITE_URL=
|
||||
@@ -5,7 +5,7 @@ import os
|
||||
import sys
|
||||
|
||||
|
||||
from pipecat.frames.frames import LLMMessagesFrame, StopTaskFrame
|
||||
from pipecat.frames.frames import LLMMessagesFrame, StopTaskFrame, EndFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineTask
|
||||
@@ -139,6 +139,16 @@ async def main(room_url, token=None):
|
||||
|
||||
main_task = PipelineTask(main_pipeline)
|
||||
|
||||
@transport.event_handler("on_participant_left")
|
||||
async def on_participant_left(transport, participant, reason):
|
||||
intro_task.queue_frame(EndFrame())
|
||||
await main_task.queue_frame(EndFrame())
|
||||
|
||||
@transport.event_handler("on_call_state_updated")
|
||||
async def on_call_state_updated(transport, state):
|
||||
if state == "left":
|
||||
await main_task.queue_frame(EndFrame())
|
||||
|
||||
await runner.run(main_task)
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
233
examples/storytelling-chatbot/src/bot_runner.py
Normal file
233
examples/storytelling-chatbot/src/bot_runner.py
Normal file
@@ -0,0 +1,233 @@
|
||||
import os
|
||||
import argparse
|
||||
import subprocess
|
||||
import requests
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
from fastapi import FastAPI, Request, HTTPException
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from fastapi.staticfiles import StaticFiles
|
||||
from fastapi.responses import FileResponse, JSONResponse
|
||||
|
||||
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomObject, DailyRoomProperties, DailyRoomParams
|
||||
|
||||
|
||||
from dotenv import load_dotenv
|
||||
load_dotenv(override=True)
|
||||
|
||||
# ------------ Fast API Config ------------ #
|
||||
|
||||
MAX_SESSION_TIME = 5 * 60 # 5 minutes
|
||||
|
||||
daily_rest_helper = DailyRESTHelper(
|
||||
os.getenv("DAILY_API_KEY", ""),
|
||||
os.getenv("DAILY_API_URL", 'https://api.daily.co/v1'))
|
||||
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
app.add_middleware(
|
||||
CORSMiddleware,
|
||||
allow_origins=["*"],
|
||||
allow_credentials=True,
|
||||
allow_methods=["*"],
|
||||
allow_headers=["*"],
|
||||
)
|
||||
|
||||
# Mount the static directory
|
||||
STATIC_DIR = "frontend/out"
|
||||
|
||||
|
||||
# ------------ Fast API Routes ------------ #
|
||||
|
||||
app.mount("/static", StaticFiles(directory=STATIC_DIR, html=True), name="static")
|
||||
|
||||
|
||||
@app.post("/start_bot")
|
||||
async def start_bot(request: Request) -> JSONResponse:
|
||||
if os.getenv("ENV", "dev") == "production":
|
||||
# Only allow requests from the specified domain
|
||||
host_header = request.headers.get("host")
|
||||
allowed_domains = ["storytelling-chatbot.fly.dev", "www.storytelling-chatbot.fly.dev"]
|
||||
# Check if the Host header matches the allowed domain
|
||||
if host_header not in allowed_domains:
|
||||
raise HTTPException(status_code=403, detail="Access denied")
|
||||
|
||||
try:
|
||||
data = await request.json()
|
||||
# Is this a webhook creation request?
|
||||
if "test" in data:
|
||||
return JSONResponse({"test": True})
|
||||
except Exception as e:
|
||||
pass
|
||||
|
||||
# Use specified room URL, or create a new one if not specified
|
||||
room_url = os.getenv("DAILY_SAMPLE_ROOM_URL", "")
|
||||
|
||||
if not room_url:
|
||||
params = DailyRoomParams(
|
||||
properties=DailyRoomProperties()
|
||||
)
|
||||
try:
|
||||
room: DailyRoomObject = daily_rest_helper.create_room(params=params)
|
||||
except Exception as e:
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail=f"Unable to provision room {e}")
|
||||
else:
|
||||
# Check passed room URL exists, we should assume that it already has a sip set up
|
||||
try:
|
||||
room: DailyRoomObject = daily_rest_helper.get_room_from_url(room_url)
|
||||
except Exception:
|
||||
raise HTTPException(
|
||||
status_code=500, detail=f"Room not found: {room_url}")
|
||||
|
||||
# Give the agent a token to join the session
|
||||
token = daily_rest_helper.get_token(room.url, MAX_SESSION_TIME)
|
||||
|
||||
if not room or not token:
|
||||
raise HTTPException(
|
||||
status_code=500, detail=f"Failed to get token for room: {room_url}")
|
||||
|
||||
# Launch a new VM, or run as a shell process (not recommended)
|
||||
if os.getenv("RUN_AS_VM", False):
|
||||
try:
|
||||
virtualize_bot(room.url, token)
|
||||
except Exception as e:
|
||||
raise HTTPException(
|
||||
status_code=500, detail=f"Failed to spawn VM: {e}")
|
||||
else:
|
||||
try:
|
||||
subprocess.Popen(
|
||||
[f"python3 -m bot -u {room.url} -t {token}"],
|
||||
shell=True,
|
||||
bufsize=1,
|
||||
cwd=os.path.dirname(os.path.abspath(__file__)))
|
||||
except Exception as e:
|
||||
raise HTTPException(
|
||||
status_code=500, detail=f"Failed to start subprocess: {e}")
|
||||
|
||||
# Grab a token for the user to join with
|
||||
user_token = daily_rest_helper.get_token(room.url, MAX_SESSION_TIME)
|
||||
|
||||
return JSONResponse({
|
||||
"room_url": room.url,
|
||||
"token": user_token,
|
||||
})
|
||||
|
||||
|
||||
@app.get("/{path_name:path}", response_class=FileResponse)
|
||||
async def catch_all(path_name: Optional[str] = ""):
|
||||
if path_name == "":
|
||||
return FileResponse(f"{STATIC_DIR}/index.html")
|
||||
|
||||
file_path = Path(STATIC_DIR) / (path_name or "")
|
||||
|
||||
if file_path.is_file():
|
||||
return file_path
|
||||
|
||||
html_file_path = file_path.with_suffix(".html")
|
||||
if html_file_path.is_file():
|
||||
return FileResponse(html_file_path)
|
||||
|
||||
raise HTTPException(status_code=450, detail="Incorrect API call")
|
||||
|
||||
|
||||
# ------------ Virtualization ------------ #
|
||||
|
||||
def virtualize_bot(room_url: str, token: str):
|
||||
"""
|
||||
This is an example of how to virtualize the bot using Fly.io
|
||||
You can adapt this method to use whichever cloud provider you prefer.
|
||||
"""
|
||||
FLY_API_HOST = os.getenv("FLY_API_HOST", "https://api.machines.dev/v1")
|
||||
FLY_APP_NAME = os.getenv("FLY_APP_NAME", "storytelling-chatbot")
|
||||
FLY_API_KEY = os.getenv("FLY_API_KEY", "")
|
||||
FLY_HEADERS = {
|
||||
'Authorization': f"Bearer {FLY_API_KEY}",
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
|
||||
# Use the same image as the bot runner
|
||||
res = requests.get(f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines", headers=FLY_HEADERS)
|
||||
if res.status_code != 200:
|
||||
raise Exception(f"Unable to get machine info from Fly: {res.text}")
|
||||
image = res.json()[0]['config']['image']
|
||||
|
||||
# Machine configuration
|
||||
cmd = f"python3 src/bot.py -u {room_url} -t {token}"
|
||||
cmd = cmd.split()
|
||||
worker_props = {
|
||||
"config": {
|
||||
"image": image,
|
||||
"auto_destroy": True,
|
||||
"init": {
|
||||
"cmd": cmd
|
||||
},
|
||||
"restart": {
|
||||
"policy": "no"
|
||||
},
|
||||
"guest": {
|
||||
"cpu_kind": "shared",
|
||||
"cpus": 1,
|
||||
"memory_mb": 512
|
||||
}
|
||||
},
|
||||
|
||||
}
|
||||
|
||||
# Spawn a new machine instance
|
||||
res = requests.post(
|
||||
f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines",
|
||||
headers=FLY_HEADERS,
|
||||
json=worker_props)
|
||||
|
||||
if res.status_code != 200:
|
||||
raise Exception(f"Problem starting a bot worker: {res.text}")
|
||||
|
||||
# Wait for the machine to enter the started state
|
||||
vm_id = res.json()['id']
|
||||
|
||||
res = requests.get(
|
||||
f"{FLY_API_HOST}/apps/{FLY_APP_NAME}/machines/{vm_id}/wait?state=started",
|
||||
headers=FLY_HEADERS)
|
||||
|
||||
if res.status_code != 200:
|
||||
raise Exception(f"Bot was unable to enter started state: {res.text}")
|
||||
|
||||
print(f"Machine joined room: {room_url}")
|
||||
|
||||
|
||||
# ------------ Main ------------ #
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Check environment variables
|
||||
required_env_vars = ['OPENAI_API_KEY', 'DAILY_API_KEY',
|
||||
'FAL_KEY', 'ELEVENLABS_VOICE_ID', 'ELEVENLABS_API_KEY']
|
||||
for env_var in required_env_vars:
|
||||
if env_var not in os.environ:
|
||||
raise Exception(f"Missing environment variable: {env_var}.")
|
||||
|
||||
import uvicorn
|
||||
|
||||
default_host = os.getenv("HOST", "0.0.0.0")
|
||||
default_port = int(os.getenv("FAST_API_PORT", "7860"))
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Daily Storyteller FastAPI server")
|
||||
parser.add_argument("--host", type=str,
|
||||
default=default_host, help="Host address")
|
||||
parser.add_argument("--port", type=int,
|
||||
default=default_port, help="Port number")
|
||||
parser.add_argument("--reload", action="store_true",
|
||||
help="Reload code on change")
|
||||
|
||||
config = parser.parse_args()
|
||||
|
||||
uvicorn.run(
|
||||
"bot_runner:app",
|
||||
host=config.host,
|
||||
port=config.port,
|
||||
reload=config.reload
|
||||
)
|
||||
@@ -1,175 +0,0 @@
|
||||
import os
|
||||
import argparse
|
||||
import subprocess
|
||||
import atexit
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
from fastapi import FastAPI, Request, HTTPException
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from fastapi.staticfiles import StaticFiles
|
||||
from fastapi.responses import FileResponse, JSONResponse
|
||||
|
||||
from utils.daily_helpers import create_room as _create_room, get_token, get_name_from_url
|
||||
|
||||
MAX_BOTS_PER_ROOM = 1
|
||||
|
||||
# Bot sub-process dict for status reporting and concurrency control
|
||||
bot_procs = {}
|
||||
|
||||
|
||||
def cleanup():
|
||||
# Clean up function, just to be extra safe
|
||||
for proc in bot_procs.values():
|
||||
proc.terminate()
|
||||
proc.wait()
|
||||
|
||||
|
||||
atexit.register(cleanup)
|
||||
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
app.add_middleware(
|
||||
CORSMiddleware,
|
||||
allow_origins=["*"],
|
||||
allow_credentials=True,
|
||||
allow_methods=["*"],
|
||||
allow_headers=["*"],
|
||||
)
|
||||
|
||||
# Mount the static directory
|
||||
STATIC_DIR = "frontend/out"
|
||||
|
||||
app.mount("/static", StaticFiles(directory=STATIC_DIR, html=True), name="static")
|
||||
|
||||
|
||||
@app.post("/create")
|
||||
async def create_room(request: Request) -> JSONResponse:
|
||||
data = await request.json()
|
||||
|
||||
if data.get('room_url') is not None:
|
||||
room_url = data.get('room_url')
|
||||
room_name = get_name_from_url(room_url)
|
||||
else:
|
||||
room_url, room_name = _create_room()
|
||||
|
||||
token = get_token(room_url)
|
||||
|
||||
return JSONResponse({"room_url": room_url, "room_name": room_name, "token": token})
|
||||
|
||||
|
||||
@app.post("/start")
|
||||
async def start_agent(request: Request) -> JSONResponse:
|
||||
data = await request.json()
|
||||
|
||||
# Is this a webhook creation request?
|
||||
if "test" in data:
|
||||
return JSONResponse({"test": True})
|
||||
|
||||
# Ensure the room property is present
|
||||
room_url = data.get('room_url')
|
||||
if not room_url:
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail="Missing 'room' property in request data. Cannot start agent without a target room!")
|
||||
|
||||
# Check if there is already an existing process running in this room
|
||||
num_bots_in_room = sum(
|
||||
1 for proc in bot_procs.values() if proc[1] == room_url and proc[0].poll() is None)
|
||||
if num_bots_in_room >= MAX_BOTS_PER_ROOM:
|
||||
raise HTTPException(
|
||||
status_code=500, detail=f"Max bot limited reach for room: {room_url}")
|
||||
|
||||
# Get the token for the room
|
||||
token = get_token(room_url)
|
||||
|
||||
if not token:
|
||||
raise HTTPException(
|
||||
status_code=500, detail=f"Failed to get token for room: {room_url}")
|
||||
|
||||
# Spawn a new agent, and join the user session
|
||||
# Note: this is mostly for demonstration purposes (refer to 'deployment' in README)
|
||||
try:
|
||||
proc = subprocess.Popen(
|
||||
[
|
||||
f"python3 -m bot -u {room_url} -t {token}"
|
||||
],
|
||||
shell=True,
|
||||
bufsize=1,
|
||||
cwd=os.path.dirname(os.path.abspath(__file__))
|
||||
)
|
||||
bot_procs[proc.pid] = (proc, room_url)
|
||||
except Exception as e:
|
||||
raise HTTPException(
|
||||
status_code=500, detail=f"Failed to start subprocess: {e}")
|
||||
|
||||
return JSONResponse({"bot_id": proc.pid, "room_url": room_url})
|
||||
|
||||
|
||||
@app.get("/status/{pid}")
|
||||
def get_status(pid: int):
|
||||
# Look up the subprocess
|
||||
proc = bot_procs.get(pid)
|
||||
|
||||
# If the subprocess doesn't exist, return an error
|
||||
if not proc:
|
||||
raise HTTPException(
|
||||
status_code=404, detail=f"Bot with process id: {pid} not found")
|
||||
|
||||
# Check the status of the subprocess
|
||||
if proc[0].poll() is None:
|
||||
status = "running"
|
||||
else:
|
||||
status = "finished"
|
||||
|
||||
return JSONResponse({"bot_id": pid, "status": status})
|
||||
|
||||
|
||||
@app.get("/{path_name:path}", response_class=FileResponse)
|
||||
async def catch_all(path_name: Optional[str] = ""):
|
||||
if path_name == "":
|
||||
return FileResponse(f"{STATIC_DIR}/index.html")
|
||||
|
||||
file_path = Path(STATIC_DIR) / (path_name or "")
|
||||
|
||||
if file_path.is_file():
|
||||
return file_path
|
||||
|
||||
html_file_path = file_path.with_suffix(".html")
|
||||
if html_file_path.is_file():
|
||||
return FileResponse(html_file_path)
|
||||
|
||||
raise HTTPException(status_code=450, detail="Incorrect API call")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Check environment variables
|
||||
required_env_vars = ['OPENAI_API_KEY', 'DAILY_API_KEY',
|
||||
'FAL_KEY', 'ELEVENLABS_VOICE_ID', 'ELEVENLABS_API_KEY']
|
||||
for env_var in required_env_vars:
|
||||
if env_var not in os.environ:
|
||||
raise Exception(f"Missing environment variable: {env_var}.")
|
||||
|
||||
import uvicorn
|
||||
|
||||
default_host = os.getenv("HOST", "0.0.0.0")
|
||||
default_port = int(os.getenv("FAST_API_PORT", "7860"))
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Daily Storyteller FastAPI server")
|
||||
parser.add_argument("--host", type=str,
|
||||
default=default_host, help="Host address")
|
||||
parser.add_argument("--port", type=int,
|
||||
default=default_port, help="Port number")
|
||||
parser.add_argument("--reload", action="store_true",
|
||||
help="Reload code on change")
|
||||
|
||||
config = parser.parse_args()
|
||||
|
||||
uvicorn.run(
|
||||
"server:app",
|
||||
host=config.host,
|
||||
port=config.port,
|
||||
reload=config.reload
|
||||
)
|
||||
@@ -17,7 +17,7 @@ aiosignal==1.3.1
|
||||
# via aiohttp
|
||||
annotated-types==0.7.0
|
||||
# via pydantic
|
||||
anthropic==0.25.9
|
||||
anthropic==0.28.1
|
||||
# via
|
||||
# openpipe
|
||||
# pipecat-ai (pyproject.toml)
|
||||
@@ -36,15 +36,15 @@ attrs==23.2.0
|
||||
# via
|
||||
# aiohttp
|
||||
# openpipe
|
||||
av==12.1.0
|
||||
av==12.2.0
|
||||
# via faster-whisper
|
||||
azure-cognitiveservices-speech==1.37.0
|
||||
azure-cognitiveservices-speech==1.38.0
|
||||
# via pipecat-ai (pyproject.toml)
|
||||
blinker==1.8.2
|
||||
# via flask
|
||||
cachetools==5.3.3
|
||||
# via google-auth
|
||||
cartesia==1.0.0
|
||||
cartesia==1.0.3
|
||||
# via pipecat-ai (pyproject.toml)
|
||||
certifi==2024.6.2
|
||||
# via
|
||||
@@ -84,13 +84,13 @@ exceptiongroup==1.2.1
|
||||
# via
|
||||
# anyio
|
||||
# pytest
|
||||
fal-client==0.4.0
|
||||
fal-client==0.4.1
|
||||
# via pipecat-ai (pyproject.toml)
|
||||
fastapi==0.111.0
|
||||
# via pipecat-ai (pyproject.toml)
|
||||
fastapi-cli==0.0.4
|
||||
# via fastapi
|
||||
faster-whisper==1.0.2
|
||||
faster-whisper==1.0.3
|
||||
# via pipecat-ai (pyproject.toml)
|
||||
filelock==3.15.4
|
||||
# via
|
||||
@@ -111,22 +111,22 @@ frozenlist==1.4.1
|
||||
# via
|
||||
# aiohttp
|
||||
# aiosignal
|
||||
fsspec==2024.6.0
|
||||
fsspec==2024.6.1
|
||||
# via
|
||||
# huggingface-hub
|
||||
# torch
|
||||
future==1.0.0
|
||||
# via pyloudnorm
|
||||
google-ai-generativelanguage==0.6.4
|
||||
google-ai-generativelanguage==0.6.6
|
||||
# via google-generativeai
|
||||
google-api-core[grpc]==2.19.1
|
||||
# via
|
||||
# google-ai-generativelanguage
|
||||
# google-api-python-client
|
||||
# google-generativeai
|
||||
google-api-python-client==2.134.0
|
||||
google-api-python-client==2.135.0
|
||||
# via google-generativeai
|
||||
google-auth==2.30.0
|
||||
google-auth==2.31.0
|
||||
# via
|
||||
# google-ai-generativelanguage
|
||||
# google-api-core
|
||||
@@ -135,7 +135,7 @@ google-auth==2.30.0
|
||||
# google-generativeai
|
||||
google-auth-httplib2==0.2.0
|
||||
# via google-api-python-client
|
||||
google-generativeai==0.5.4
|
||||
google-generativeai==0.7.1
|
||||
# via pipecat-ai (pyproject.toml)
|
||||
googleapis-common-protos==1.63.2
|
||||
# via
|
||||
@@ -197,31 +197,35 @@ jinja2==3.1.4
|
||||
# fastapi
|
||||
# flask
|
||||
# torch
|
||||
jiter==0.5.0
|
||||
# via anthropic
|
||||
jsonpatch==1.33
|
||||
# via langchain-core
|
||||
jsonpointer==3.0.0
|
||||
# via jsonpatch
|
||||
langchain==0.2.5
|
||||
langchain==0.2.6
|
||||
# via
|
||||
# langchain-community
|
||||
# pipecat-ai (pyproject.toml)
|
||||
langchain-community==0.2.5
|
||||
langchain-community==0.2.6
|
||||
# via pipecat-ai (pyproject.toml)
|
||||
langchain-core==0.2.9
|
||||
langchain-core==0.2.10
|
||||
# via
|
||||
# langchain
|
||||
# langchain-community
|
||||
# langchain-openai
|
||||
# langchain-text-splitters
|
||||
langchain-openai==0.1.9
|
||||
langchain-openai==0.1.10
|
||||
# via pipecat-ai (pyproject.toml)
|
||||
langchain-text-splitters==0.2.1
|
||||
langchain-text-splitters==0.2.2
|
||||
# via langchain
|
||||
langsmith==0.1.82
|
||||
langsmith==0.1.83
|
||||
# via
|
||||
# langchain
|
||||
# langchain-community
|
||||
# langchain-core
|
||||
llvmlite==0.43.0
|
||||
# via numba
|
||||
loguru==0.7.2
|
||||
# via pipecat-ai (pyproject.toml)
|
||||
markdown-it-py==3.0.0
|
||||
@@ -244,14 +248,18 @@ mypy-extensions==1.0.0
|
||||
# via typing-inspect
|
||||
networkx==3.3
|
||||
# via torch
|
||||
numba==0.60.0
|
||||
# via resampy
|
||||
numpy==1.26.4
|
||||
# via
|
||||
# ctranslate2
|
||||
# langchain
|
||||
# langchain-community
|
||||
# numba
|
||||
# onnxruntime
|
||||
# pipecat-ai (pyproject.toml)
|
||||
# pyloudnorm
|
||||
# resampy
|
||||
# scipy
|
||||
# torchvision
|
||||
# transformers
|
||||
@@ -280,20 +288,20 @@ nvidia-cusparse-cu12==12.1.0.106
|
||||
# torch
|
||||
nvidia-nccl-cu12==2.20.5
|
||||
# via torch
|
||||
nvidia-nvjitlink-cu12==12.5.40
|
||||
nvidia-nvjitlink-cu12==12.5.82
|
||||
# via
|
||||
# nvidia-cusolver-cu12
|
||||
# nvidia-cusparse-cu12
|
||||
nvidia-nvtx-cu12==12.1.105
|
||||
# via torch
|
||||
onnxruntime==1.18.0
|
||||
onnxruntime==1.18.1
|
||||
# via faster-whisper
|
||||
openai==1.26.0
|
||||
openai==1.27.0
|
||||
# via
|
||||
# langchain-openai
|
||||
# openpipe
|
||||
# pipecat-ai (pyproject.toml)
|
||||
openpipe==4.14.0
|
||||
openpipe==4.16.0
|
||||
# via pipecat-ai (pyproject.toml)
|
||||
orjson==3.10.5
|
||||
# via
|
||||
@@ -336,7 +344,7 @@ pyasn1-modules==0.4.0
|
||||
# via google-auth
|
||||
pyaudio==0.2.14
|
||||
# via pipecat-ai (pyproject.toml)
|
||||
pydantic==2.7.4
|
||||
pydantic==2.8.0
|
||||
# via
|
||||
# anthropic
|
||||
# fastapi
|
||||
@@ -345,7 +353,7 @@ pydantic==2.7.4
|
||||
# langchain-core
|
||||
# langsmith
|
||||
# openai
|
||||
pydantic-core==2.18.4
|
||||
pydantic-core==2.20.0
|
||||
# via pydantic
|
||||
pygments==2.18.0
|
||||
# via rich
|
||||
@@ -392,6 +400,8 @@ requests==2.32.3
|
||||
# pyht
|
||||
# tiktoken
|
||||
# transformers
|
||||
resampy==0.4.3
|
||||
# via pipecat-ai (pyproject.toml)
|
||||
rich==13.7.1
|
||||
# via typer
|
||||
rsa==4.9
|
||||
|
||||
@@ -17,7 +17,7 @@ aiosignal==1.3.1
|
||||
# via aiohttp
|
||||
annotated-types==0.7.0
|
||||
# via pydantic
|
||||
anthropic==0.25.9
|
||||
anthropic==0.28.1
|
||||
# via
|
||||
# openpipe
|
||||
# pipecat-ai (pyproject.toml)
|
||||
@@ -36,15 +36,15 @@ attrs==23.2.0
|
||||
# via
|
||||
# aiohttp
|
||||
# openpipe
|
||||
av==12.1.0
|
||||
av==12.2.0
|
||||
# via faster-whisper
|
||||
azure-cognitiveservices-speech==1.37.0
|
||||
azure-cognitiveservices-speech==1.38.0
|
||||
# via pipecat-ai (pyproject.toml)
|
||||
blinker==1.8.2
|
||||
# via flask
|
||||
cachetools==5.3.3
|
||||
# via google-auth
|
||||
cartesia==1.0.0
|
||||
cartesia==1.0.3
|
||||
# via pipecat-ai (pyproject.toml)
|
||||
certifi==2024.6.2
|
||||
# via
|
||||
@@ -84,13 +84,13 @@ exceptiongroup==1.2.1
|
||||
# via
|
||||
# anyio
|
||||
# pytest
|
||||
fal-client==0.4.0
|
||||
fal-client==0.4.1
|
||||
# via pipecat-ai (pyproject.toml)
|
||||
fastapi==0.111.0
|
||||
# via pipecat-ai (pyproject.toml)
|
||||
fastapi-cli==0.0.4
|
||||
# via fastapi
|
||||
faster-whisper==1.0.2
|
||||
faster-whisper==1.0.3
|
||||
# via pipecat-ai (pyproject.toml)
|
||||
filelock==3.15.4
|
||||
# via
|
||||
@@ -110,22 +110,22 @@ frozenlist==1.4.1
|
||||
# via
|
||||
# aiohttp
|
||||
# aiosignal
|
||||
fsspec==2024.6.0
|
||||
fsspec==2024.6.1
|
||||
# via
|
||||
# huggingface-hub
|
||||
# torch
|
||||
future==1.0.0
|
||||
# via pyloudnorm
|
||||
google-ai-generativelanguage==0.6.4
|
||||
google-ai-generativelanguage==0.6.6
|
||||
# via google-generativeai
|
||||
google-api-core[grpc]==2.19.1
|
||||
# via
|
||||
# google-ai-generativelanguage
|
||||
# google-api-python-client
|
||||
# google-generativeai
|
||||
google-api-python-client==2.134.0
|
||||
google-api-python-client==2.135.0
|
||||
# via google-generativeai
|
||||
google-auth==2.30.0
|
||||
google-auth==2.31.0
|
||||
# via
|
||||
# google-ai-generativelanguage
|
||||
# google-api-core
|
||||
@@ -134,7 +134,7 @@ google-auth==2.30.0
|
||||
# google-generativeai
|
||||
google-auth-httplib2==0.2.0
|
||||
# via google-api-python-client
|
||||
google-generativeai==0.5.4
|
||||
google-generativeai==0.7.1
|
||||
# via pipecat-ai (pyproject.toml)
|
||||
googleapis-common-protos==1.63.2
|
||||
# via
|
||||
@@ -194,17 +194,19 @@ jinja2==3.1.4
|
||||
# fastapi
|
||||
# flask
|
||||
# torch
|
||||
jiter==0.5.0
|
||||
# via anthropic
|
||||
jsonpatch==1.33
|
||||
# via langchain-core
|
||||
jsonpointer==3.0.0
|
||||
# via jsonpatch
|
||||
langchain==0.2.5
|
||||
langchain==0.2.6
|
||||
# via
|
||||
# langchain-community
|
||||
# pipecat-ai (pyproject.toml)
|
||||
langchain-community==0.2.5
|
||||
langchain-community==0.2.6
|
||||
# via pipecat-ai (pyproject.toml)
|
||||
langchain-core==0.2.9
|
||||
langchain-core==0.2.10
|
||||
# via
|
||||
# langchain
|
||||
# langchain-community
|
||||
@@ -212,13 +214,15 @@ langchain-core==0.2.9
|
||||
# langchain-text-splitters
|
||||
langchain-openai==0.1.10
|
||||
# via pipecat-ai (pyproject.toml)
|
||||
langchain-text-splitters==0.2.1
|
||||
langchain-text-splitters==0.2.2
|
||||
# via langchain
|
||||
langsmith==0.1.82
|
||||
langsmith==0.1.83
|
||||
# via
|
||||
# langchain
|
||||
# langchain-community
|
||||
# langchain-core
|
||||
llvmlite==0.43.0
|
||||
# via numba
|
||||
loguru==0.7.2
|
||||
# via pipecat-ai (pyproject.toml)
|
||||
markdown-it-py==3.0.0
|
||||
@@ -241,25 +245,29 @@ mypy-extensions==1.0.0
|
||||
# via typing-inspect
|
||||
networkx==3.3
|
||||
# via torch
|
||||
numba==0.60.0
|
||||
# via resampy
|
||||
numpy==1.26.4
|
||||
# via
|
||||
# ctranslate2
|
||||
# langchain
|
||||
# langchain-community
|
||||
# numba
|
||||
# onnxruntime
|
||||
# pipecat-ai (pyproject.toml)
|
||||
# pyloudnorm
|
||||
# resampy
|
||||
# scipy
|
||||
# torchvision
|
||||
# transformers
|
||||
onnxruntime==1.18.0
|
||||
onnxruntime==1.18.1
|
||||
# via faster-whisper
|
||||
openai==1.26.0
|
||||
openai==1.27.0
|
||||
# via
|
||||
# langchain-openai
|
||||
# openpipe
|
||||
# pipecat-ai (pyproject.toml)
|
||||
openpipe==4.14.0
|
||||
openpipe==4.16.0
|
||||
# via pipecat-ai (pyproject.toml)
|
||||
orjson==3.10.5
|
||||
# via
|
||||
@@ -302,7 +310,7 @@ pyasn1-modules==0.4.0
|
||||
# via google-auth
|
||||
pyaudio==0.2.14
|
||||
# via pipecat-ai (pyproject.toml)
|
||||
pydantic==2.7.4
|
||||
pydantic==2.8.0
|
||||
# via
|
||||
# anthropic
|
||||
# fastapi
|
||||
@@ -311,7 +319,7 @@ pydantic==2.7.4
|
||||
# langchain-core
|
||||
# langsmith
|
||||
# openai
|
||||
pydantic-core==2.18.4
|
||||
pydantic-core==2.20.0
|
||||
# via pydantic
|
||||
pygments==2.18.0
|
||||
# via rich
|
||||
@@ -358,6 +366,8 @@ requests==2.32.3
|
||||
# pyht
|
||||
# tiktoken
|
||||
# transformers
|
||||
resampy==0.4.3
|
||||
# via pipecat-ai (pyproject.toml)
|
||||
rich==13.7.1
|
||||
# via typer
|
||||
rsa==4.9
|
||||
|
||||
@@ -8,7 +8,7 @@ dynamic = ["version"]
|
||||
description = "An open source framework for voice (and multimodal) assistants"
|
||||
license = { text = "BSD 2-Clause License" }
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.7"
|
||||
requires-python = ">=3.10"
|
||||
keywords = ["webrtc", "audio", "video", "ai"]
|
||||
classifiers = [
|
||||
"Development Status :: 5 - Production/Stable",
|
||||
@@ -34,24 +34,26 @@ Source = "https://github.com/pipecat-ai/pipecat"
|
||||
Website = "https://pipecat.ai"
|
||||
|
||||
[project.optional-dependencies]
|
||||
anthropic = [ "anthropic~=0.25.7" ]
|
||||
azure = [ "azure-cognitiveservices-speech~=1.37.0" ]
|
||||
cartesia = [ "cartesia~=1.0.0" ]
|
||||
anthropic = [ "anthropic~=0.28.1" ]
|
||||
azure = [ "azure-cognitiveservices-speech~=1.38.0" ]
|
||||
cartesia = [ "websockets~=12.0" ]
|
||||
daily = [ "daily-python~=0.10.1" ]
|
||||
deepgram = [ "deepgram-sdk~=3.2.7" ]
|
||||
examples = [ "python-dotenv~=1.0.0", "flask~=3.0.3", "flask_cors~=4.0.1" ]
|
||||
fal = [ "fal-client~=0.4.0" ]
|
||||
google = [ "google-generativeai~=0.5.3" ]
|
||||
fireworks = [ "openai~=1.26.0" ]
|
||||
langchain = [ "langchain~=0.2.1", "langchain-community~=0.2.1", "langchain-openai~=0.1.8" ]
|
||||
fal = [ "fal-client~=0.4.1" ]
|
||||
gladia = [ "websockets~=12.0" ]
|
||||
google = [ "google-generativeai~=0.7.1" ]
|
||||
fireworks = [ "openai~=1.27.0" ]
|
||||
langchain = [ "langchain~=0.2.6", "langchain-community~=0.2.6", "langchain-openai~=0.1.10" ]
|
||||
local = [ "pyaudio~=0.2.0" ]
|
||||
moondream = [ "einops~=0.8.0", "timm~=0.9.16", "transformers~=4.40.2" ]
|
||||
openai = [ "openai~=1.26.0" ]
|
||||
openpipe = [ "openpipe~=4.14.0" ]
|
||||
openai = [ "openai~=1.27.0" ]
|
||||
openpipe = [ "openpipe~=4.16.0" ]
|
||||
playht = [ "pyht~=0.0.28" ]
|
||||
silero = [ "torch~=2.3.0", "torchaudio~=2.3.0" ]
|
||||
silero = [ "torch~=2.3.1", "torchaudio~=2.3.1" ]
|
||||
websocket = [ "websockets~=12.0", "fastapi~=0.111.0" ]
|
||||
whisper = [ "faster-whisper~=1.0.2" ]
|
||||
whisper = [ "faster-whisper~=1.0.3" ]
|
||||
xtts = [ "resampy~=0.4.3" ]
|
||||
|
||||
[tool.setuptools.packages.find]
|
||||
# All the following settings are optional:
|
||||
|
||||
@@ -158,6 +158,34 @@ class LLMMessagesFrame(DataFrame):
|
||||
messages: List[dict]
|
||||
|
||||
|
||||
@dataclass
|
||||
class LLMMessagesAppendFrame(DataFrame):
|
||||
"""A frame containing a list of LLM messages that neeed to be added to the
|
||||
current context.
|
||||
|
||||
"""
|
||||
messages: List[dict]
|
||||
|
||||
|
||||
@dataclass
|
||||
class LLMMessagesUpdateFrame(DataFrame):
|
||||
"""A frame containing a list of new LLM messages. These messages will
|
||||
replace the current context LLM messages and should generate a new
|
||||
LLMMessagesFrame.
|
||||
|
||||
"""
|
||||
messages: List[dict]
|
||||
|
||||
|
||||
@dataclass
|
||||
class TTSSpeakFrame(DataFrame):
|
||||
"""A frame that contains a text that should be spoken by the TTS in the
|
||||
pipeline (if any).
|
||||
|
||||
"""
|
||||
text: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class TransportMessageFrame(DataFrame):
|
||||
message: Any
|
||||
@@ -240,12 +268,33 @@ class StopInterruptionFrame(SystemFrame):
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class BotInterruptionFrame(SystemFrame):
|
||||
"""Emitted by when the bot should be interrupted. This will mainly cause the
|
||||
same actions as if the user interrupted except that the
|
||||
UserStartedSpeakingFrame and UserStoppedSpeakingFrame won't be generated.
|
||||
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class BotSpeakingFrame(SystemFrame):
|
||||
"""Emitted by transport outputs while the bot is still speaking. This can be
|
||||
used, for example, to detect when a user is idle. That is, while the bot is
|
||||
speaking we don't want to trigger any user idle timeout since the user might
|
||||
be listening.
|
||||
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class MetricsFrame(SystemFrame):
|
||||
"""Emitted by processor that can compute metrics like latencies.
|
||||
"""
|
||||
ttfb: Mapping[str, float]
|
||||
|
||||
ttfb: List[Mapping[str, Any]] | None = None
|
||||
processing: List[Mapping[str, Any]] | None = None
|
||||
|
||||
#
|
||||
# Control frames
|
||||
@@ -271,27 +320,13 @@ class EndFrame(ControlFrame):
|
||||
|
||||
@dataclass
|
||||
class LLMFullResponseStartFrame(ControlFrame):
|
||||
"""Used to indicate the beginning of a full LLM response. Following
|
||||
LLMResponseStartFrame, TextFrame and LLMResponseEndFrame for each sentence
|
||||
until a LLMFullResponseEndFrame."""
|
||||
"""Used to indicate the beginning of an LLM response. Following by one or
|
||||
more TextFrame and a final LLMFullResponseEndFrame."""
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class LLMFullResponseEndFrame(ControlFrame):
|
||||
"""Indicates the end of a full LLM response."""
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class LLMResponseStartFrame(ControlFrame):
|
||||
"""Used to indicate the beginning of an LLM response. Following TextFrames
|
||||
are part of the LLM response until an LLMResponseEndFrame"""
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class LLMResponseEndFrame(ControlFrame):
|
||||
"""Indicates the end of an LLM response."""
|
||||
pass
|
||||
|
||||
@@ -338,3 +373,17 @@ class UserImageRequestFrame(ControlFrame):
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.name}, user: {self.user_id}"
|
||||
|
||||
|
||||
@dataclass
|
||||
class LLMModelUpdateFrame(ControlFrame):
|
||||
"""A control frame containing a request to update to a new LLM model.
|
||||
"""
|
||||
model: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class TTSVoiceUpdateFrame(ControlFrame):
|
||||
"""A control frame containing a request to update to a new TTS voice.
|
||||
"""
|
||||
voice: str
|
||||
|
||||
@@ -64,7 +64,7 @@ class Pipeline(BasePipeline):
|
||||
services = []
|
||||
for p in self._processors:
|
||||
if isinstance(p, BasePipeline):
|
||||
services += p.processors_with_metrics()
|
||||
services.extend(p.processors_with_metrics())
|
||||
elif p.can_generate_metrics():
|
||||
services.append(p)
|
||||
return services
|
||||
@@ -91,5 +91,7 @@ class Pipeline(BasePipeline):
|
||||
def _link_processors(self):
|
||||
prev = self._processors[0]
|
||||
for curr in self._processors[1:]:
|
||||
prev.set_parent(self)
|
||||
prev.link(curr)
|
||||
prev = curr
|
||||
prev.set_parent(self)
|
||||
|
||||
@@ -15,7 +15,7 @@ from loguru import logger
|
||||
|
||||
class PipelineRunner:
|
||||
|
||||
def __init__(self, name: str | None = None, handle_sigint: bool = True):
|
||||
def __init__(self, *, name: str | None = None, handle_sigint: bool = True):
|
||||
self.id: int = obj_id()
|
||||
self.name: str = name or f"{self.__class__.__name__}#{obj_count(self)}"
|
||||
|
||||
|
||||
@@ -21,6 +21,7 @@ from loguru import logger
|
||||
class PipelineParams(BaseModel):
|
||||
allow_interruptions: bool = False
|
||||
enable_metrics: bool = False
|
||||
send_initial_empty_metrics: bool = True
|
||||
report_only_initial_ttfb: bool = False
|
||||
|
||||
|
||||
@@ -95,8 +96,9 @@ class PipelineTask:
|
||||
|
||||
def _initial_metrics_frame(self) -> MetricsFrame:
|
||||
processors = self._pipeline.processors_with_metrics()
|
||||
ttfb = dict(zip([p.name for p in processors], [0] * len(processors)))
|
||||
return MetricsFrame(ttfb=ttfb)
|
||||
ttfb = [{"processor": p.name, "value": 0.0} for p in processors]
|
||||
processing = [{"processor": p.name, "value": 0.0} for p in processors]
|
||||
return MetricsFrame(ttfb=ttfb, processing=processing)
|
||||
|
||||
async def _process_down_queue(self):
|
||||
start_frame = StartFrame(
|
||||
@@ -105,7 +107,9 @@ class PipelineTask:
|
||||
report_only_initial_ttfb=self._params.report_only_initial_ttfb
|
||||
)
|
||||
await self._source.process_frame(start_frame, FrameDirection.DOWNSTREAM)
|
||||
await self._source.process_frame(self._initial_metrics_frame(), FrameDirection.DOWNSTREAM)
|
||||
|
||||
if self._params.send_initial_empty_metrics:
|
||||
await self._source.process_frame(self._initial_metrics_frame(), FrameDirection.DOWNSTREAM)
|
||||
|
||||
running = True
|
||||
should_cleanup = True
|
||||
|
||||
@@ -14,9 +14,9 @@ from pipecat.frames.frames import (
|
||||
InterimTranscriptionFrame,
|
||||
LLMFullResponseEndFrame,
|
||||
LLMFullResponseStartFrame,
|
||||
LLMResponseEndFrame,
|
||||
LLMResponseStartFrame,
|
||||
LLMMessagesAppendFrame,
|
||||
LLMMessagesFrame,
|
||||
LLMMessagesUpdateFrame,
|
||||
StartInterruptionFrame,
|
||||
TranscriptionFrame,
|
||||
TextFrame,
|
||||
@@ -122,6 +122,19 @@ class LLMResponseAggregator(FrameProcessor):
|
||||
# Reset anyways
|
||||
self._reset()
|
||||
await self.push_frame(frame, direction)
|
||||
elif isinstance(frame, LLMMessagesAppendFrame):
|
||||
self._messages.extend(frame.messages)
|
||||
messages_frame = LLMMessagesFrame(self._messages)
|
||||
await self.push_frame(messages_frame)
|
||||
elif isinstance(frame, LLMMessagesUpdateFrame):
|
||||
# We push the frame downstream so the assistant aggregator gets
|
||||
# updated as well.
|
||||
await self.push_frame(frame)
|
||||
# We can now reset this one.
|
||||
self._reset()
|
||||
self._messages = frame.messages
|
||||
messages_frame = LLMMessagesFrame(self._messages)
|
||||
await self.push_frame(messages_frame)
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
@@ -173,7 +186,7 @@ class LLMUserResponseAggregator(LLMResponseAggregator):
|
||||
|
||||
class LLMFullResponseAggregator(FrameProcessor):
|
||||
"""This class aggregates Text frames until it receives a
|
||||
LLMResponseEndFrame, then emits the concatenated text as
|
||||
LLMFullResponseEndFrame, then emits the concatenated text as
|
||||
a single text frame.
|
||||
|
||||
given the following frames:
|
||||
@@ -182,12 +195,12 @@ class LLMFullResponseAggregator(FrameProcessor):
|
||||
TextFrame(" world.")
|
||||
TextFrame(" I am")
|
||||
TextFrame(" an LLM.")
|
||||
LLMResponseEndFrame()]
|
||||
LLMFullResponseEndFrame()]
|
||||
|
||||
this processor will yield nothing for the first 4 frames, then
|
||||
|
||||
TextFrame("Hello, world. I am an LLM.")
|
||||
LLMResponseEndFrame()
|
||||
LLMFullResponseEndFrame()
|
||||
|
||||
when passed the last frame.
|
||||
|
||||
@@ -203,9 +216,9 @@ class LLMFullResponseAggregator(FrameProcessor):
|
||||
>>> asyncio.run(print_frames(aggregator, TextFrame(" world.")))
|
||||
>>> asyncio.run(print_frames(aggregator, TextFrame(" I am")))
|
||||
>>> asyncio.run(print_frames(aggregator, TextFrame(" an LLM.")))
|
||||
>>> asyncio.run(print_frames(aggregator, LLMResponseEndFrame()))
|
||||
>>> asyncio.run(print_frames(aggregator, LLMFullResponseEndFrame()))
|
||||
Hello, world. I am an LLM.
|
||||
LLMResponseEndFrame
|
||||
LLMFullResponseEndFrame
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
@@ -234,6 +247,11 @@ class LLMContextAggregator(LLMResponseAggregator):
|
||||
async def _push_aggregation(self):
|
||||
if len(self._aggregation) > 0:
|
||||
self._context.add_message({"role": self._role, "content": self._aggregation})
|
||||
|
||||
# Reset the aggregation. Reset it before pushing it down, otherwise
|
||||
# if the tasks gets cancelled we won't be able to clear things up.
|
||||
self._aggregation = ""
|
||||
|
||||
frame = OpenAILLMContextFrame(self._context)
|
||||
await self.push_frame(frame)
|
||||
|
||||
@@ -247,9 +265,10 @@ class LLMAssistantContextAggregator(LLMContextAggregator):
|
||||
messages=[],
|
||||
context=context,
|
||||
role="assistant",
|
||||
start_frame=LLMResponseStartFrame,
|
||||
end_frame=LLMResponseEndFrame,
|
||||
accumulator_frame=TextFrame
|
||||
start_frame=LLMFullResponseStartFrame,
|
||||
end_frame=LLMFullResponseEndFrame,
|
||||
accumulator_frame=TextFrame,
|
||||
handle_interruptions=True
|
||||
)
|
||||
|
||||
|
||||
|
||||
64
src/pipecat/processors/async_frame_processor.py
Normal file
64
src/pipecat/processors/async_frame_processor.py
Normal file
@@ -0,0 +1,64 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
|
||||
from pipecat.frames.frames import EndFrame, Frame, StartInterruptionFrame
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
|
||||
|
||||
class AsyncFrameProcessor(FrameProcessor):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
name: str | None = None,
|
||||
loop: asyncio.AbstractEventLoop | None = None,
|
||||
**kwargs):
|
||||
super().__init__(name=name, loop=loop, **kwargs)
|
||||
|
||||
self._create_push_task()
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, StartInterruptionFrame):
|
||||
await self._handle_interruptions(frame)
|
||||
|
||||
async def queue_frame(
|
||||
self,
|
||||
frame: Frame,
|
||||
direction: FrameDirection = FrameDirection.DOWNSTREAM):
|
||||
await self._push_queue.put((frame, direction))
|
||||
|
||||
async def cleanup(self):
|
||||
self._push_frame_task.cancel()
|
||||
await self._push_frame_task
|
||||
|
||||
async def _handle_interruptions(self, frame: Frame):
|
||||
# Cancel the task. This will stop pushing frames downstream.
|
||||
self._push_frame_task.cancel()
|
||||
await self._push_frame_task
|
||||
# Push an out-of-band frame (i.e. not using the ordered push
|
||||
# frame task).
|
||||
await self.push_frame(frame)
|
||||
# Create a new queue and task.
|
||||
self._create_push_task()
|
||||
|
||||
def _create_push_task(self):
|
||||
self._push_queue = asyncio.Queue()
|
||||
self._push_frame_task = self.get_event_loop().create_task(self._push_frame_task_handler())
|
||||
|
||||
async def _push_frame_task_handler(self):
|
||||
running = True
|
||||
while running:
|
||||
try:
|
||||
(frame, direction) = await self._push_queue.get()
|
||||
await self.push_frame(frame, direction)
|
||||
running = not isinstance(frame, EndFrame)
|
||||
self._push_queue.task_done()
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
@@ -82,5 +82,5 @@ class WakeCheckFilter(FrameProcessor):
|
||||
await self.push_frame(frame, direction)
|
||||
except Exception as e:
|
||||
error_msg = f"Error in wake word filter: {e}"
|
||||
logger.error(error_msg)
|
||||
logger.exception(error_msg)
|
||||
await self.push_error(ErrorFrame(error_msg))
|
||||
|
||||
@@ -9,7 +9,7 @@ import time
|
||||
|
||||
from enum import Enum
|
||||
|
||||
from pipecat.frames.frames import ErrorFrame, Frame, MetricsFrame, StartFrame, UserStoppedSpeakingFrame
|
||||
from pipecat.frames.frames import ErrorFrame, Frame, MetricsFrame, StartFrame, StartInterruptionFrame, UserStoppedSpeakingFrame
|
||||
from pipecat.utils.utils import obj_count, obj_id
|
||||
|
||||
from loguru import logger
|
||||
@@ -20,15 +20,59 @@ class FrameDirection(Enum):
|
||||
UPSTREAM = 2
|
||||
|
||||
|
||||
class FrameProcessorMetrics:
|
||||
def __init__(self, name: str):
|
||||
self._name = name
|
||||
self._start_ttfb_time = 0
|
||||
self._start_processing_time = 0
|
||||
self._should_report_ttfb = True
|
||||
|
||||
async def start_ttfb_metrics(self, report_only_initial_ttfb):
|
||||
if self._should_report_ttfb:
|
||||
self._start_ttfb_time = time.time()
|
||||
self._should_report_ttfb = not report_only_initial_ttfb
|
||||
|
||||
async def stop_ttfb_metrics(self):
|
||||
if self._start_ttfb_time == 0:
|
||||
return None
|
||||
|
||||
value = time.time() - self._start_ttfb_time
|
||||
logger.debug(f"{self._name} TTFB: {value}")
|
||||
ttfb = {
|
||||
"processor": self._name,
|
||||
"value": value
|
||||
}
|
||||
self._start_ttfb_time = 0
|
||||
return MetricsFrame(ttfb=[ttfb])
|
||||
|
||||
async def start_processing_metrics(self):
|
||||
self._start_processing_time = time.time()
|
||||
|
||||
async def stop_processing_metrics(self):
|
||||
if self._start_processing_time == 0:
|
||||
return None
|
||||
|
||||
value = time.time() - self._start_processing_time
|
||||
logger.debug(f"{self._name} processing time: {value}")
|
||||
processing = {
|
||||
"processor": self._name,
|
||||
"value": value
|
||||
}
|
||||
self._start_processing_time = 0
|
||||
return MetricsFrame(processing=[processing])
|
||||
|
||||
|
||||
class FrameProcessor:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
name: str | None = None,
|
||||
loop: asyncio.AbstractEventLoop | None = None,
|
||||
**kwargs):
|
||||
self.id: int = obj_id()
|
||||
self.name = name or f"{self.__class__.__name__}#{obj_count(self)}"
|
||||
self._parent: "FrameProcessor" | None = None
|
||||
self._prev: "FrameProcessor" | None = None
|
||||
self._next: "FrameProcessor" | None = None
|
||||
self._loop: asyncio.AbstractEventLoop = loop or asyncio.get_running_loop()
|
||||
@@ -39,8 +83,7 @@ class FrameProcessor:
|
||||
self._report_only_initial_ttfb = False
|
||||
|
||||
# Metrics
|
||||
self._start_ttfb_time = 0
|
||||
self._should_report_ttfb = True
|
||||
self._metrics = FrameProcessorMetrics(name=self.name)
|
||||
|
||||
@property
|
||||
def interruptions_allowed(self):
|
||||
@@ -58,21 +101,33 @@ class FrameProcessor:
|
||||
return False
|
||||
|
||||
async def start_ttfb_metrics(self):
|
||||
if self.metrics_enabled and self._should_report_ttfb:
|
||||
self._start_ttfb_time = time.time()
|
||||
self._should_report_ttfb = not self._report_only_initial_ttfb
|
||||
if self.can_generate_metrics() and self.metrics_enabled:
|
||||
await self._metrics.start_ttfb_metrics(self._report_only_initial_ttfb)
|
||||
|
||||
async def stop_ttfb_metrics(self):
|
||||
if self.metrics_enabled and self._start_ttfb_time > 0:
|
||||
ttfb = time.time() - self._start_ttfb_time
|
||||
logger.debug(f"{self.name} TTFB: {ttfb}")
|
||||
await self.push_frame(MetricsFrame(ttfb={self.name: ttfb}))
|
||||
self._start_ttfb_time = 0
|
||||
if self.can_generate_metrics() and self.metrics_enabled:
|
||||
frame = await self._metrics.stop_ttfb_metrics()
|
||||
if frame:
|
||||
await self.push_frame(frame)
|
||||
|
||||
async def start_processing_metrics(self):
|
||||
if self.can_generate_metrics() and self.metrics_enabled:
|
||||
await self._metrics.start_processing_metrics()
|
||||
|
||||
async def stop_processing_metrics(self):
|
||||
if self.can_generate_metrics() and self.metrics_enabled:
|
||||
frame = await self._metrics.stop_processing_metrics()
|
||||
if frame:
|
||||
await self.push_frame(frame)
|
||||
|
||||
async def stop_all_metrics(self):
|
||||
await self.stop_ttfb_metrics()
|
||||
await self.stop_processing_metrics()
|
||||
|
||||
async def cleanup(self):
|
||||
pass
|
||||
|
||||
def link(self, processor: 'FrameProcessor'):
|
||||
def link(self, processor: "FrameProcessor"):
|
||||
self._next = processor
|
||||
processor._prev = self
|
||||
logger.debug(f"Linking {self} -> {self._next}")
|
||||
@@ -80,11 +135,19 @@ class FrameProcessor:
|
||||
def get_event_loop(self) -> asyncio.AbstractEventLoop:
|
||||
return self._loop
|
||||
|
||||
def set_parent(self, parent: "FrameProcessor"):
|
||||
self._parent = parent
|
||||
|
||||
def get_parent(self) -> "FrameProcessor":
|
||||
return self._parent
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
if isinstance(frame, StartFrame):
|
||||
self._allow_interruptions = frame.allow_interruptions
|
||||
self._enable_metrics = frame.enable_metrics
|
||||
self._report_only_initial_ttfb = frame.report_only_initial_ttfb
|
||||
elif isinstance(frame, StartInterruptionFrame):
|
||||
await self.stop_all_metrics()
|
||||
elif isinstance(frame, UserStoppedSpeakingFrame):
|
||||
self._should_report_ttfb = True
|
||||
|
||||
@@ -92,12 +155,15 @@ class FrameProcessor:
|
||||
await self.push_frame(error, FrameDirection.UPSTREAM)
|
||||
|
||||
async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
|
||||
if direction == FrameDirection.DOWNSTREAM and self._next:
|
||||
logger.trace(f"Pushing {frame} from {self} to {self._next}")
|
||||
await self._next.process_frame(frame, direction)
|
||||
elif direction == FrameDirection.UPSTREAM and self._prev:
|
||||
logger.trace(f"Pushing {frame} upstream from {self} to {self._prev}")
|
||||
await self._prev.process_frame(frame, direction)
|
||||
try:
|
||||
if direction == FrameDirection.DOWNSTREAM and self._next:
|
||||
logger.trace(f"Pushing {frame} from {self} to {self._next}")
|
||||
await self._next.process_frame(frame, direction)
|
||||
elif direction == FrameDirection.UPSTREAM and self._prev:
|
||||
logger.trace(f"Pushing {frame} upstream from {self} to {self._prev}")
|
||||
await self._prev.process_frame(frame, direction)
|
||||
except Exception as e:
|
||||
logger.exception(f"Uncaught exception in {self}: {e}")
|
||||
|
||||
def __str__(self):
|
||||
return self.name
|
||||
|
||||
@@ -11,8 +11,6 @@ from pipecat.frames.frames import (
|
||||
LLMFullResponseEndFrame,
|
||||
LLMFullResponseStartFrame,
|
||||
LLMMessagesFrame,
|
||||
LLMResponseEndFrame,
|
||||
LLMResponseStartFrame,
|
||||
TextFrame)
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
|
||||
@@ -69,11 +67,10 @@ class LangchainProcessor(FrameProcessor):
|
||||
{self._transcript_key: text},
|
||||
config={"configurable": {"session_id": self._participant_id}},
|
||||
):
|
||||
await self.push_frame(LLMResponseStartFrame())
|
||||
await self.push_frame(TextFrame(self.__get_token_value(token)))
|
||||
await self.push_frame(LLMResponseEndFrame())
|
||||
except GeneratorExit:
|
||||
logger.warning(f"{self} generator was closed prematurely")
|
||||
except Exception as e:
|
||||
logger.error(f"{self} an unknown error occurred: {e}")
|
||||
await self.push_frame(LLMFullResponseEndFrame())
|
||||
logger.exception(f"{self} an unknown error occurred: {e}")
|
||||
finally:
|
||||
await self.push_frame(LLMFullResponseEndFrame())
|
||||
|
||||
531
src/pipecat/processors/frameworks/rtvi.py
Normal file
531
src/pipecat/processors/frameworks/rtvi.py
Normal file
@@ -0,0 +1,531 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import dataclasses
|
||||
|
||||
from typing import List, Literal, Optional, Type
|
||||
from pydantic import BaseModel, ValidationError
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
BotInterruptionFrame,
|
||||
Frame,
|
||||
InterimTranscriptionFrame,
|
||||
LLMFullResponseEndFrame,
|
||||
LLMFullResponseStartFrame,
|
||||
LLMMessagesAppendFrame,
|
||||
LLMMessagesUpdateFrame,
|
||||
LLMModelUpdateFrame,
|
||||
MetricsFrame,
|
||||
StartFrame,
|
||||
SystemFrame,
|
||||
TTSSpeakFrame,
|
||||
TTSVoiceUpdateFrame,
|
||||
TextFrame,
|
||||
TranscriptionFrame,
|
||||
TransportMessageFrame,
|
||||
UserStartedSpeakingFrame,
|
||||
UserStoppedSpeakingFrame)
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.processors.aggregators.llm_response import (
|
||||
LLMAssistantResponseAggregator, LLMUserResponseAggregator)
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.services.ai_services import AIService
|
||||
from pipecat.services.cartesia import CartesiaTTSService
|
||||
from pipecat.services.openai import OpenAILLMService, OpenAILLMContext
|
||||
from pipecat.transports.base_transport import BaseTransport
|
||||
|
||||
DEFAULT_MESSAGES = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
|
||||
}
|
||||
]
|
||||
|
||||
DEFAULT_MODEL = "llama3-70b-8192"
|
||||
|
||||
DEFAULT_VOICE = "79a125e8-cd45-4c13-8a67-188112f4dd22"
|
||||
|
||||
|
||||
class RTVILLMConfig(BaseModel):
|
||||
model: Optional[str] = None
|
||||
messages: Optional[List[dict]] = None
|
||||
|
||||
|
||||
class RTVITTSConfig(BaseModel):
|
||||
voice: Optional[str] = None
|
||||
|
||||
|
||||
class RTVIConfig(BaseModel):
|
||||
llm: Optional[RTVILLMConfig] = None
|
||||
tts: Optional[RTVITTSConfig] = None
|
||||
|
||||
|
||||
class RTVISetup(BaseModel):
|
||||
config: Optional[RTVIConfig] = None
|
||||
|
||||
|
||||
class RTVILLMMessageData(BaseModel):
|
||||
messages: List[dict]
|
||||
|
||||
|
||||
class RTVITTSMessageData(BaseModel):
|
||||
text: str
|
||||
interrupt: Optional[bool] = False
|
||||
|
||||
|
||||
class RTVIMessageData(BaseModel):
|
||||
setup: Optional[RTVISetup] = None
|
||||
config: Optional[RTVIConfig] = None
|
||||
llm: Optional[RTVILLMMessageData] = None
|
||||
tts: Optional[RTVITTSMessageData] = None
|
||||
|
||||
|
||||
class RTVIMessage(BaseModel):
|
||||
label: Literal["rtvi"] = "rtvi"
|
||||
type: str
|
||||
id: str
|
||||
data: Optional[RTVIMessageData] = None
|
||||
|
||||
|
||||
class RTVIResponseData(BaseModel):
|
||||
success: bool
|
||||
error: Optional[str] = None
|
||||
|
||||
|
||||
class RTVIResponse(BaseModel):
|
||||
label: Literal["rtvi"] = "rtvi"
|
||||
type: Literal["response"] = "response"
|
||||
id: str
|
||||
data: RTVIResponseData
|
||||
|
||||
|
||||
class RTVIErrorData(BaseModel):
|
||||
message: str
|
||||
|
||||
|
||||
class RTVIError(BaseModel):
|
||||
label: Literal["rtvi"] = "rtvi"
|
||||
type: Literal["error"] = "error"
|
||||
data: RTVIErrorData
|
||||
|
||||
|
||||
class RTVILLMContextMessageData(BaseModel):
|
||||
messages: List[dict]
|
||||
|
||||
|
||||
class RTVILLMContextMessage(BaseModel):
|
||||
label: Literal["rtvi"] = "rtvi"
|
||||
type: Literal["llm-context"] = "llm-context"
|
||||
data: RTVILLMContextMessageData
|
||||
|
||||
|
||||
class RTVITTSTextMessageData(BaseModel):
|
||||
text: str
|
||||
|
||||
|
||||
class RTVITTSTextMessage(BaseModel):
|
||||
label: Literal["rtvi"] = "rtvi"
|
||||
type: Literal["tts-text"] = "tts-text"
|
||||
data: RTVITTSTextMessageData
|
||||
|
||||
|
||||
class RTVIBotReady(BaseModel):
|
||||
label: Literal["rtvi"] = "rtvi"
|
||||
type: Literal["bot-ready"] = "bot-ready"
|
||||
|
||||
|
||||
class RTVITranscriptionMessageData(BaseModel):
|
||||
text: str
|
||||
user_id: str
|
||||
timestamp: str
|
||||
final: bool
|
||||
|
||||
|
||||
class RTVITranscriptionMessage(BaseModel):
|
||||
label: Literal["rtvi"] = "rtvi"
|
||||
type: Literal["user-transcription"] = "user-transcription"
|
||||
data: RTVITranscriptionMessageData
|
||||
|
||||
|
||||
class RTVIUserStartedSpeakingMessage(BaseModel):
|
||||
label: Literal["rtvi"] = "rtvi"
|
||||
type: Literal["user-started-speaking"] = "user-started-speaking"
|
||||
|
||||
|
||||
class RTVIUserStoppedSpeakingMessage(BaseModel):
|
||||
label: Literal["rtvi"] = "rtvi"
|
||||
type: Literal["user-stopped-speaking"] = "user-stopped-speaking"
|
||||
|
||||
|
||||
class RTVIJSONCompletion(BaseModel):
|
||||
label: Literal["rtvi"] = "rtvi"
|
||||
type: Literal["json-completion"] = "json-completion"
|
||||
data: str
|
||||
|
||||
|
||||
class FunctionCaller(FrameProcessor):
|
||||
|
||||
def __init__(self, context):
|
||||
super().__init__()
|
||||
self._checking = False
|
||||
self._aggregating = False
|
||||
self._emitted_start = False
|
||||
self._aggregation = ""
|
||||
self._context = context
|
||||
|
||||
self._callbacks = {}
|
||||
self._start_callbacks = {}
|
||||
|
||||
def register_function(self, function_name: str, callback, start_callback=None):
|
||||
self._callbacks[function_name] = callback
|
||||
if start_callback:
|
||||
self._start_callbacks[function_name] = start_callback
|
||||
|
||||
def unregister_function(self, function_name: str):
|
||||
del self._callbacks[function_name]
|
||||
if self._start_callbacks[function_name]:
|
||||
del self._start_callbacks[function_name]
|
||||
|
||||
def has_function(self, function_name: str):
|
||||
return function_name in self._callbacks.keys()
|
||||
|
||||
async def call_function(self, function_name: str, args):
|
||||
if function_name in self._callbacks.keys():
|
||||
return await self._callbacks[function_name](self, args)
|
||||
return None
|
||||
|
||||
async def call_start_function(self, function_name: str):
|
||||
if function_name in self._start_callbacks.keys():
|
||||
await self._start_callbacks[function_name](self)
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, LLMFullResponseStartFrame):
|
||||
self._checking = True
|
||||
await self.push_frame(frame, direction)
|
||||
elif isinstance(frame, TextFrame) and self._checking:
|
||||
# TODO-CB: should we expand this to any non-text character to start the completion?
|
||||
if frame.text.strip().startswith("{") or frame.text.strip().startswith("```"):
|
||||
self._emitted_start = False
|
||||
self._checking = False
|
||||
self._aggregation = frame.text
|
||||
self._aggregating = True
|
||||
else:
|
||||
self._checking = False
|
||||
self._aggregating = False
|
||||
self._aggregation = ""
|
||||
self._emitted_start = False
|
||||
await self.push_frame(frame, direction)
|
||||
elif isinstance(frame, TextFrame) and self._aggregating:
|
||||
self._aggregation += frame.text
|
||||
# TODO-CB: We can probably ignore function start I think
|
||||
# if not self._emitted_start:
|
||||
# fn = re.search(r'{"function_name":\s*"(.*)",', self._aggregation)
|
||||
# if fn and fn.group(1):
|
||||
# await self.call_start_function(fn.group(1))
|
||||
# self._emitted_start = True
|
||||
elif isinstance(frame, LLMFullResponseEndFrame) and self._aggregating:
|
||||
try:
|
||||
self._aggregation = self._aggregation.replace("```json", "").replace("```", "")
|
||||
self._context.add_message({"role": "assistant", "content": self._aggregation})
|
||||
message = RTVIJSONCompletion(data=self._aggregation)
|
||||
msg = message.model_dump(exclude_none=True)
|
||||
await self.push_frame(TransportMessageFrame(message=msg))
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error parsing function call json: {e}")
|
||||
print(f"aggregation was: {self._aggregation}")
|
||||
|
||||
self._aggregating = False
|
||||
self._aggregation = ""
|
||||
self._emitted_start = False
|
||||
elif isinstance(frame, LLMFullResponseEndFrame):
|
||||
await self.push_frame(frame, direction)
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
|
||||
class RTVITTSTextProcessor(FrameProcessor):
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, TextFrame):
|
||||
message = RTVITTSTextMessage(data=RTVITTSTextMessageData(text=frame.text))
|
||||
await self.push_frame(TransportMessageFrame(message=message.model_dump(exclude_none=True)))
|
||||
|
||||
|
||||
class RTVIProcessor(FrameProcessor):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
transport: BaseTransport,
|
||||
setup: RTVISetup | None = None,
|
||||
llm_api_key: str = "",
|
||||
llm_base_url: str = "https://api.groq.com/openai/v1",
|
||||
tts_api_key: str = "",
|
||||
llm_cls: Type[AIService] = OpenAILLMService,
|
||||
tts_cls: Type[AIService] = CartesiaTTSService):
|
||||
super().__init__()
|
||||
self._transport = transport
|
||||
self._setup = setup
|
||||
self._llm_api_key = llm_api_key
|
||||
self._llm_base_url = llm_base_url
|
||||
self._tts_api_key = tts_api_key
|
||||
self._llm_cls = llm_cls
|
||||
self._tts_cls = tts_cls
|
||||
self._start_frame: Frame | None = None
|
||||
self._llm: FrameProcessor | None = None
|
||||
self._tts: FrameProcessor | None = None
|
||||
self._pipeline: FrameProcessor | None = None
|
||||
|
||||
self._frame_handler_task = self.get_event_loop().create_task(self._frame_handler())
|
||||
self._frame_queue = asyncio.Queue()
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, SystemFrame):
|
||||
await self.push_frame(frame, direction)
|
||||
else:
|
||||
await self._frame_queue.put((frame, direction))
|
||||
|
||||
if isinstance(frame, StartFrame):
|
||||
self._start_frame = frame
|
||||
try:
|
||||
await self._handle_setup(self._setup)
|
||||
except Exception as e:
|
||||
await self._send_error(f"unable to setup RTVI: {e}")
|
||||
|
||||
async def cleanup(self):
|
||||
self._frame_handler_task.cancel()
|
||||
await self._frame_handler_task
|
||||
|
||||
async def _frame_handler(self):
|
||||
while True:
|
||||
try:
|
||||
(frame, direction) = await self._frame_queue.get()
|
||||
await self._handle_frame(frame, direction)
|
||||
self._frame_queue.task_done()
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
|
||||
async def _handle_frame(self, frame: Frame, direction: FrameDirection):
|
||||
if isinstance(frame, TransportMessageFrame):
|
||||
await self._handle_message(frame)
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, TranscriptionFrame) or isinstance(frame, InterimTranscriptionFrame):
|
||||
await self._handle_transcriptions(frame)
|
||||
elif isinstance(frame, UserStartedSpeakingFrame) or isinstance(frame, UserStoppedSpeakingFrame):
|
||||
await self._handle_interruptions(frame)
|
||||
|
||||
async def _handle_transcriptions(self, frame: Frame):
|
||||
# TODO(aleix): Once we add support for using custom piplines, the STTs will
|
||||
# be in the pipeline after this processor. This means the STT will have to
|
||||
# push transcriptions upstream as well.
|
||||
|
||||
message = None
|
||||
if isinstance(frame, TranscriptionFrame):
|
||||
message = RTVITranscriptionMessage(
|
||||
data=RTVITranscriptionMessageData(
|
||||
text=frame.text,
|
||||
user_id=frame.user_id,
|
||||
timestamp=frame.timestamp,
|
||||
final=True))
|
||||
elif isinstance(frame, InterimTranscriptionFrame):
|
||||
message = RTVITranscriptionMessage(
|
||||
data=RTVITranscriptionMessageData(
|
||||
text=frame.text,
|
||||
user_id=frame.user_id,
|
||||
timestamp=frame.timestamp,
|
||||
final=False))
|
||||
|
||||
if message:
|
||||
frame = TransportMessageFrame(message=message.model_dump(exclude_none=True))
|
||||
await self.push_frame(frame)
|
||||
|
||||
async def _handle_interruptions(self, frame: Frame):
|
||||
message = None
|
||||
if isinstance(frame, UserStartedSpeakingFrame):
|
||||
message = RTVIUserStartedSpeakingMessage()
|
||||
elif isinstance(frame, UserStoppedSpeakingFrame):
|
||||
message = RTVIUserStoppedSpeakingMessage()
|
||||
|
||||
if message:
|
||||
frame = TransportMessageFrame(message=message.model_dump(exclude_none=True))
|
||||
await self.push_frame(frame)
|
||||
|
||||
async def _handle_message(self, frame: TransportMessageFrame):
|
||||
try:
|
||||
message = RTVIMessage.model_validate(frame.message)
|
||||
except ValidationError as e:
|
||||
await self._send_error(f"invalid message: {e}")
|
||||
return
|
||||
|
||||
try:
|
||||
success = True
|
||||
error = None
|
||||
match message.type:
|
||||
case "setup":
|
||||
setup = None
|
||||
if message.data:
|
||||
setup = message.data.setup
|
||||
await self._handle_setup(message.id, setup)
|
||||
case "config-update":
|
||||
await self._handle_config_update(message.data.config)
|
||||
case "llm-get-context":
|
||||
await self._handle_llm_get_context()
|
||||
case "llm-append-context":
|
||||
await self._handle_llm_append_context(message.data.llm)
|
||||
case "llm-update-context":
|
||||
await self._handle_llm_update_context(message.data.llm)
|
||||
case "tts-speak":
|
||||
await self._handle_tts_speak(message.data.tts)
|
||||
case "tts-interrupt":
|
||||
await self._handle_tts_interrupt()
|
||||
case _:
|
||||
success = False
|
||||
error = f"unsupported type {message.type}"
|
||||
|
||||
await self._send_response(message.id, success, error)
|
||||
except ValidationError as e:
|
||||
await self._send_response(message.id, False, f"invalid message: {e}")
|
||||
except Exception as e:
|
||||
await self._send_response(message.id, False, f"{e}")
|
||||
|
||||
async def _handle_setup(self, setup: RTVISetup | None):
|
||||
model = DEFAULT_MODEL
|
||||
if setup and setup.config and setup.config.llm and setup.config.llm.model:
|
||||
model = setup.config.llm.model
|
||||
|
||||
messages = DEFAULT_MESSAGES
|
||||
if setup and setup.config and setup.config.llm and setup.config.llm.messages:
|
||||
messages = setup.config.llm.messages
|
||||
|
||||
voice = DEFAULT_VOICE
|
||||
if setup and setup.config and setup.config.tts and setup.config.tts.voice:
|
||||
voice = setup.config.tts.voice
|
||||
|
||||
self._tma_in = LLMUserResponseAggregator(messages)
|
||||
self._tma_out = LLMAssistantResponseAggregator(messages)
|
||||
|
||||
self._llm = self._llm_cls(
|
||||
name="LLM",
|
||||
base_url=self._llm_base_url,
|
||||
api_key=self._llm_api_key,
|
||||
model=model)
|
||||
|
||||
self._tts = self._tts_cls(name="TTS", api_key=self._tts_api_key, voice_id=voice)
|
||||
|
||||
# TODO-CB: Eventually we'll need to switch the context aggregators to use the
|
||||
# OpenAI context frames instead of message frames
|
||||
context = OpenAILLMContext(messages=messages)
|
||||
self._fc = FunctionCaller(context)
|
||||
|
||||
self._tts_text = RTVITTSTextProcessor()
|
||||
|
||||
pipeline = Pipeline([
|
||||
self._tma_in,
|
||||
self._llm,
|
||||
self._fc,
|
||||
self._tts,
|
||||
self._tts_text,
|
||||
self._tma_out,
|
||||
self._transport.output(),
|
||||
])
|
||||
self._pipeline = pipeline
|
||||
|
||||
parent = self.get_parent()
|
||||
if parent and self._start_frame:
|
||||
parent.link(pipeline)
|
||||
|
||||
# We need to initialize the new pipeline with the same settings
|
||||
# as the initial one.
|
||||
start_frame = dataclasses.replace(self._start_frame)
|
||||
await self.push_frame(start_frame)
|
||||
|
||||
# Send new initial metrics with the new processors
|
||||
processors = parent.processors_with_metrics()
|
||||
processors.extend(self._pipeline.processors_with_metrics())
|
||||
ttfb = [{"processor": p.name, "value": 0.0} for p in processors]
|
||||
processing = [{"processor": p.name, "value": 0.0} for p in processors]
|
||||
await self.push_frame(MetricsFrame(ttfb=ttfb, processing=processing))
|
||||
|
||||
message = RTVIBotReady()
|
||||
frame = TransportMessageFrame(message=message.model_dump(exclude_none=True))
|
||||
await self.push_frame(frame)
|
||||
|
||||
async def _handle_config_update(self, config: RTVIConfig):
|
||||
# Change voice before LLM updates, so we can hear the new vocie.
|
||||
if config.tts and config.tts.voice:
|
||||
frame = TTSVoiceUpdateFrame(config.tts.voice)
|
||||
await self.push_frame(frame)
|
||||
if config.llm and config.llm.model:
|
||||
frame = LLMModelUpdateFrame(config.llm.model)
|
||||
await self.push_frame(frame)
|
||||
if config.llm and config.llm.messages:
|
||||
frame = LLMMessagesUpdateFrame(config.llm.messages)
|
||||
await self.push_frame(frame)
|
||||
|
||||
async def _handle_llm_get_context(self):
|
||||
data = RTVILLMContextMessageData(messages=self._tma_in.messages)
|
||||
message = RTVILLMContextMessage(data=data)
|
||||
frame = TransportMessageFrame(message=message.model_dump(exclude_none=True))
|
||||
await self.push_frame(frame)
|
||||
|
||||
async def _handle_llm_append_context(self, data: RTVILLMMessageData):
|
||||
if data and data.messages:
|
||||
frame = LLMMessagesAppendFrame(data.messages)
|
||||
await self.push_frame(frame)
|
||||
|
||||
async def _handle_llm_update_context(self, data: RTVILLMMessageData):
|
||||
if data and data.messages:
|
||||
frame = LLMMessagesUpdateFrame(data.messages)
|
||||
await self.push_frame(frame)
|
||||
|
||||
async def _handle_tts_speak(self, data: RTVITTSMessageData):
|
||||
if data and data.text:
|
||||
if data.interrupt:
|
||||
await self._handle_tts_interrupt()
|
||||
frame = TTSSpeakFrame(text=data.text)
|
||||
await self.push_frame(frame)
|
||||
|
||||
async def _handle_tts_interrupt(self):
|
||||
await self.push_frame(BotInterruptionFrame(), FrameDirection.UPSTREAM)
|
||||
|
||||
async def _send_error(self, error: str):
|
||||
message = RTVIError(data=RTVIErrorData(message=error))
|
||||
frame = TransportMessageFrame(message=message.model_dump(exclude_none=True))
|
||||
await self.push_frame(frame)
|
||||
|
||||
async def _send_response(self, id: str, success: bool, error: str | None = None):
|
||||
# TODO(aleix): This is a bit hacky, but we might get invalid
|
||||
# configuration or something might going wrong during setup and we would
|
||||
# like to send the error to the client. However, if the pipeline is not
|
||||
# setup yet we don't have an output transport and therefore we can't
|
||||
# send any messages. So, we setup a super basic pipeline with just the
|
||||
# output transport so we can send messages.
|
||||
if not self._pipeline:
|
||||
pipeline = Pipeline([self._transport.output()])
|
||||
self._pipeline = pipeline
|
||||
|
||||
parent = self.get_parent()
|
||||
if parent and self._start_frame:
|
||||
parent.link(pipeline)
|
||||
|
||||
message = RTVIResponse(id=id, data=RTVIResponseData(success=success, error=error))
|
||||
frame = TransportMessageFrame(message=message.model_dump(exclude_none=True))
|
||||
await self.push_frame(frame)
|
||||
76
src/pipecat/processors/idle_frame_processor.py
Normal file
76
src/pipecat/processors/idle_frame_processor.py
Normal file
@@ -0,0 +1,76 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
|
||||
from typing import Awaitable, Callable, List
|
||||
|
||||
from pipecat.frames.frames import Frame, SystemFrame
|
||||
from pipecat.processors.async_frame_processor import AsyncFrameProcessor
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
|
||||
|
||||
class IdleFrameProcessor(AsyncFrameProcessor):
|
||||
"""This class waits to receive any frame or list of desired frames within a
|
||||
given timeout. If the timeout is reached before receiving any of those
|
||||
frames the provided callback will be called.
|
||||
|
||||
The callback can then be used to push frames downstream by using
|
||||
`queue_frame()` (or `push_frame()` for system frames).
|
||||
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
callback: Callable[["IdleFrameProcessor"], Awaitable[None]],
|
||||
timeout: float,
|
||||
types: List[type] = [],
|
||||
**kwargs):
|
||||
super().__init__(**kwargs)
|
||||
|
||||
self._callback = callback
|
||||
self._timeout = timeout
|
||||
self._types = types
|
||||
|
||||
self._create_idle_task()
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, SystemFrame):
|
||||
await self.push_frame(frame, direction)
|
||||
else:
|
||||
await self.queue_frame(frame, direction)
|
||||
|
||||
# If we are not waiting for any specific frame set the event, otherwise
|
||||
# check if we have received one of the desired frames.
|
||||
if not self._types:
|
||||
self._idle_event.set()
|
||||
else:
|
||||
for t in self._types:
|
||||
if isinstance(frame, t):
|
||||
self._idle_event.set()
|
||||
|
||||
# If we are not waiting for any specific frame set the event, otherwise
|
||||
async def cleanup(self):
|
||||
self._idle_task.cancel()
|
||||
await self._idle_task
|
||||
|
||||
def _create_idle_task(self):
|
||||
self._idle_event = asyncio.Event()
|
||||
self._idle_task = self.get_event_loop().create_task(self._idle_task_handler())
|
||||
|
||||
async def _idle_task_handler(self):
|
||||
while True:
|
||||
try:
|
||||
await asyncio.wait_for(self._idle_event.wait(), timeout=self._timeout)
|
||||
except asyncio.TimeoutError:
|
||||
await self._callback(self)
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
finally:
|
||||
self._idle_event.clear()
|
||||
@@ -33,6 +33,6 @@ class StatelessTextTransformer(FrameProcessor):
|
||||
result = self._transform_fn(frame.text)
|
||||
if isinstance(result, Coroutine):
|
||||
result = await result
|
||||
await self.push_frame(result)
|
||||
await self.push_frame(TextFrame(text=result))
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
77
src/pipecat/processors/user_idle_processor.py
Normal file
77
src/pipecat/processors/user_idle_processor.py
Normal file
@@ -0,0 +1,77 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
|
||||
from typing import Awaitable, Callable
|
||||
|
||||
from pipecat.frames.frames import BotSpeakingFrame, Frame, StartInterruptionFrame, StopInterruptionFrame, SystemFrame
|
||||
from pipecat.processors.async_frame_processor import AsyncFrameProcessor
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
|
||||
|
||||
class UserIdleProcessor(AsyncFrameProcessor):
|
||||
"""This class is useful to check if the user is interacting with the bot
|
||||
within a given timeout. If the timeout is reached before any interaction
|
||||
occurred the provided callback will be called.
|
||||
|
||||
The callback can then be used to push frames downstream by using
|
||||
`queue_frame()` (or `push_frame()` for system frames).
|
||||
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
callback: Callable[["UserIdleProcessor"], Awaitable[None]],
|
||||
timeout: float,
|
||||
**kwargs):
|
||||
super().__init__(**kwargs)
|
||||
|
||||
self._callback = callback
|
||||
self._timeout = timeout
|
||||
|
||||
self._interrupted = False
|
||||
|
||||
self._create_idle_task()
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, SystemFrame):
|
||||
await self.push_frame(frame, direction)
|
||||
else:
|
||||
await self.queue_frame(frame, direction)
|
||||
|
||||
# We shouldn't call the idle callback if the user or the bot are speaking.
|
||||
if isinstance(frame, StartInterruptionFrame):
|
||||
self._interrupted = True
|
||||
self._idle_event.set()
|
||||
elif isinstance(frame, StopInterruptionFrame):
|
||||
self._interrupted = False
|
||||
self._idle_event.set()
|
||||
elif isinstance(frame, BotSpeakingFrame):
|
||||
self._idle_event.set()
|
||||
|
||||
async def cleanup(self):
|
||||
self._idle_task.cancel()
|
||||
await self._idle_task
|
||||
|
||||
def _create_idle_task(self):
|
||||
self._idle_event = asyncio.Event()
|
||||
self._idle_task = self.get_event_loop().create_task(self._idle_task_handler())
|
||||
|
||||
async def _idle_task_handler(self):
|
||||
while True:
|
||||
try:
|
||||
await asyncio.wait_for(self._idle_event.wait(), timeout=self._timeout)
|
||||
except asyncio.TimeoutError:
|
||||
if not self._interrupted:
|
||||
await self._callback(self)
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
finally:
|
||||
self._idle_event.clear()
|
||||
@@ -19,14 +19,35 @@ from pipecat.frames.frames import (
|
||||
LLMFullResponseEndFrame,
|
||||
StartFrame,
|
||||
StartInterruptionFrame,
|
||||
TTSSpeakFrame,
|
||||
TTSStartedFrame,
|
||||
TTSStoppedFrame,
|
||||
TTSVoiceUpdateFrame,
|
||||
TextFrame,
|
||||
VisionImageRawFrame,
|
||||
)
|
||||
from pipecat.processors.async_frame_processor import AsyncFrameProcessor
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.utils.audio import calculate_audio_volume
|
||||
from pipecat.utils.utils import exp_smoothing
|
||||
import re
|
||||
|
||||
|
||||
ENDOFSENTENCE_PATTERN_STR = r"""
|
||||
(?<![A-Z]) # Negative lookbehind: not preceded by an uppercase letter (e.g., "U.S.A.")
|
||||
(?<!\d) # Negative lookbehind: not preceded by a digit (e.g., "1. Let's start")
|
||||
(?<!\d\s[ap]) # Negative lookbehind: not preceded by time (e.g., "3:00 a.m.")
|
||||
(?<!Mr|Ms|Dr) # Negative lookbehind: not preceded by Mr, Ms, Dr (combined bc. length is the same)
|
||||
(?<!Mrs) # Negative lookbehind: not preceded by "Mrs"
|
||||
(?<!Prof) # Negative lookbehind: not preceded by "Prof"
|
||||
[\.\?\!:] # Match a period, question mark, exclamation point, or colon
|
||||
$ # End of string
|
||||
"""
|
||||
ENDOFSENTENCE_PATTERN = re.compile(ENDOFSENTENCE_PATTERN_STR, re.VERBOSE)
|
||||
|
||||
|
||||
def match_endofsentence(text: str) -> bool:
|
||||
return ENDOFSENTENCE_PATTERN.search(text.rstrip()) is not None
|
||||
|
||||
|
||||
class AIService(FrameProcessor):
|
||||
@@ -60,6 +81,30 @@ class AIService(FrameProcessor):
|
||||
await self.push_frame(f)
|
||||
|
||||
|
||||
class AsyncAIService(AsyncFrameProcessor):
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
pass
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
pass
|
||||
|
||||
async def cancel(self, frame: CancelFrame):
|
||||
pass
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, StartFrame):
|
||||
await self.start(frame)
|
||||
elif isinstance(frame, CancelFrame):
|
||||
await self.cancel(frame)
|
||||
elif isinstance(frame, EndFrame):
|
||||
await self.stop(frame)
|
||||
|
||||
|
||||
class LLMService(AIService):
|
||||
"""This class is a no-op but serves as a base class for LLM services."""
|
||||
|
||||
@@ -93,11 +138,22 @@ class LLMService(AIService):
|
||||
|
||||
|
||||
class TTSService(AIService):
|
||||
def __init__(self, aggregate_sentences: bool = True, **kwargs):
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
aggregate_sentences: bool = True,
|
||||
# if True, subclass is responsible for pushing TextFrames and LLMFullResponseEndFrames
|
||||
push_text_frames: bool = True,
|
||||
**kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._aggregate_sentences: bool = aggregate_sentences
|
||||
self._push_text_frames: bool = push_text_frames
|
||||
self._current_sentence: str = ""
|
||||
|
||||
@abstractmethod
|
||||
async def set_voice(self, voice: str):
|
||||
pass
|
||||
|
||||
# Converts the text to audio.
|
||||
@abstractmethod
|
||||
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
|
||||
@@ -106,32 +162,37 @@ class TTSService(AIService):
|
||||
async def say(self, text: str):
|
||||
await self.process_frame(TextFrame(text=text), FrameDirection.DOWNSTREAM)
|
||||
|
||||
async def _handle_interruption(self, frame: StartInterruptionFrame, direction: FrameDirection):
|
||||
self._current_sentence = ""
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
async def _process_text_frame(self, frame: TextFrame):
|
||||
text: str | None = None
|
||||
if not self._aggregate_sentences:
|
||||
text = frame.text
|
||||
else:
|
||||
self._current_sentence += frame.text
|
||||
if self._current_sentence.strip().endswith(
|
||||
(".", "?", "!")) and not self._current_sentence.strip().endswith(
|
||||
("Mr,", "Mrs.", "Ms.", "Dr.")):
|
||||
if match_endofsentence(self._current_sentence):
|
||||
text = self._current_sentence
|
||||
self._current_sentence = ""
|
||||
|
||||
if text:
|
||||
await self._push_tts_frames(text)
|
||||
|
||||
async def _push_tts_frames(self, text: str):
|
||||
async def _push_tts_frames(self, text: str, text_passthrough: bool = True):
|
||||
text = text.strip()
|
||||
if not text:
|
||||
return
|
||||
|
||||
await self.push_frame(TTSStartedFrame())
|
||||
await self.start_processing_metrics()
|
||||
await self.process_generator(self.run_tts(text))
|
||||
await self.stop_processing_metrics()
|
||||
await self.push_frame(TTSStoppedFrame())
|
||||
# We send the original text after the audio. This way, if we are
|
||||
# interrupted, the text is not added to the assistant context.
|
||||
await self.push_frame(TextFrame(text))
|
||||
if self._push_text_frames:
|
||||
# We send the original text after the audio. This way, if we are
|
||||
# interrupted, the text is not added to the assistant context.
|
||||
await self.push_frame(TextFrame(text))
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
@@ -139,12 +200,20 @@ class TTSService(AIService):
|
||||
if isinstance(frame, TextFrame):
|
||||
await self._process_text_frame(frame)
|
||||
elif isinstance(frame, StartInterruptionFrame):
|
||||
self._current_sentence = ""
|
||||
await self.push_frame(frame, direction)
|
||||
await self._handle_interruption(frame, direction)
|
||||
elif isinstance(frame, LLMFullResponseEndFrame) or isinstance(frame, EndFrame):
|
||||
sentence = self._current_sentence
|
||||
self._current_sentence = ""
|
||||
await self._push_tts_frames(self._current_sentence)
|
||||
await self.push_frame(frame)
|
||||
await self._push_tts_frames(sentence)
|
||||
if isinstance(frame, LLMFullResponseEndFrame):
|
||||
if self._push_text_frames:
|
||||
await self.push_frame(frame, direction)
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
elif isinstance(frame, TTSSpeakFrame):
|
||||
await self._push_tts_frames(frame.text, False)
|
||||
elif isinstance(frame, TTSVoiceUpdateFrame):
|
||||
await self.set_voice(frame.voice)
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
@@ -153,6 +222,7 @@ class STTService(AIService):
|
||||
"""STTService is a base class for speech-to-text services."""
|
||||
|
||||
def __init__(self,
|
||||
*,
|
||||
min_volume: float = 0.6,
|
||||
max_silence_secs: float = 0.3,
|
||||
max_buffer_secs: float = 1.5,
|
||||
@@ -208,7 +278,9 @@ class STTService(AIService):
|
||||
self._silence_num_frames = 0
|
||||
self._wave.close()
|
||||
self._content.seek(0)
|
||||
await self.start_processing_metrics()
|
||||
await self.process_generator(self.run_stt(self._content.read()))
|
||||
await self.stop_processing_metrics()
|
||||
(self._content, self._wave) = self._new_wave()
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
@@ -241,7 +313,9 @@ class ImageGenService(AIService):
|
||||
|
||||
if isinstance(frame, TextFrame):
|
||||
await self.push_frame(frame, direction)
|
||||
await self.start_processing_metrics()
|
||||
await self.process_generator(self.run_image_gen(frame.text))
|
||||
await self.stop_processing_metrics()
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
@@ -261,6 +335,8 @@ class VisionService(AIService):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, VisionImageRawFrame):
|
||||
await self.start_processing_metrics()
|
||||
await self.process_generator(self.run_vision(frame))
|
||||
await self.stop_processing_metrics()
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
@@ -8,12 +8,11 @@ import base64
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
Frame,
|
||||
LLMModelUpdateFrame,
|
||||
TextFrame,
|
||||
VisionImageRawFrame,
|
||||
LLMMessagesFrame,
|
||||
LLMFullResponseStartFrame,
|
||||
LLMResponseStartFrame,
|
||||
LLMResponseEndFrame,
|
||||
LLMFullResponseEndFrame
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
@@ -41,6 +40,7 @@ class AnthropicLLMService(LLMService):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
api_key: str,
|
||||
model: str = "claude-3-opus-20240229",
|
||||
max_tokens: int = 1024):
|
||||
@@ -117,12 +117,10 @@ class AnthropicLLMService(LLMService):
|
||||
async for event in response:
|
||||
# logger.debug(f"Anthropic LLM event: {event}")
|
||||
if (event.type == "content_block_delta"):
|
||||
await self.push_frame(LLMResponseStartFrame())
|
||||
await self.push_frame(TextFrame(event.delta.text))
|
||||
await self.push_frame(LLMResponseEndFrame())
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
logger.exception(f"{self} exception: {e}")
|
||||
finally:
|
||||
await self.push_frame(LLMFullResponseEndFrame())
|
||||
|
||||
@@ -137,6 +135,9 @@ class AnthropicLLMService(LLMService):
|
||||
context = OpenAILLMContext.from_messages(frame.messages)
|
||||
elif isinstance(frame, VisionImageRawFrame):
|
||||
context = OpenAILLMContext.from_image_frame(frame)
|
||||
elif isinstance(frame, LLMModelUpdateFrame):
|
||||
logger.debug(f"Switching LLM model to: [{frame.model}]")
|
||||
self._model = frame.model
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
|
||||
@@ -19,12 +19,11 @@ from pipecat.frames.frames import (
|
||||
ErrorFrame,
|
||||
Frame,
|
||||
StartFrame,
|
||||
StartInterruptionFrame,
|
||||
SystemFrame,
|
||||
TranscriptionFrame,
|
||||
URLImageRawFrame)
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.services.ai_services import AIService, TTSService, ImageGenService
|
||||
from pipecat.services.ai_services import AsyncAIService, TTSService, ImageGenService
|
||||
from pipecat.services.openai import BaseOpenAILLMService
|
||||
|
||||
from loguru import logger
|
||||
@@ -82,8 +81,12 @@ class AzureTTSService(TTSService):
|
||||
def can_generate_metrics(self) -> bool:
|
||||
return True
|
||||
|
||||
async def set_voice(self, voice: str):
|
||||
logger.debug(f"Switching TTS voice to: [{voice}]")
|
||||
self._voice = voice
|
||||
|
||||
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
|
||||
logger.debug(f"Generating TTS: {text}")
|
||||
logger.debug(f"Generating TTS: [{text}]")
|
||||
|
||||
await self.start_ttfb_metrics()
|
||||
|
||||
@@ -110,7 +113,7 @@ class AzureTTSService(TTSService):
|
||||
logger.error(f"{self} error: {cancellation_details.error_details}")
|
||||
|
||||
|
||||
class AzureSTTService(AIService):
|
||||
class AzureSTTService(AsyncAIService):
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
@@ -133,18 +136,10 @@ class AzureSTTService(AIService):
|
||||
speech_config=speech_config, audio_config=audio_config)
|
||||
self._speech_recognizer.recognized.connect(self._on_handle_recognized)
|
||||
|
||||
# This event will be used to ignore out-of-band transcriptions while we
|
||||
# are itnerrupted.
|
||||
self._is_interrupted_event = asyncio.Event()
|
||||
|
||||
self._create_push_task()
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, StartInterruptionFrame):
|
||||
await self._handle_interruptions(frame)
|
||||
elif isinstance(frame, SystemFrame):
|
||||
if isinstance(frame, SystemFrame):
|
||||
await self.push_frame(frame, direction)
|
||||
elif isinstance(frame, AudioRawFrame):
|
||||
self._audio_stream.write(frame.audio)
|
||||
@@ -156,44 +151,16 @@ class AzureSTTService(AIService):
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
self._speech_recognizer.stop_continuous_recognition_async()
|
||||
await self._push_queue.put((frame, FrameDirection.DOWNSTREAM))
|
||||
await self._push_frame_task
|
||||
self._audio_stream.close()
|
||||
|
||||
async def cancel(self, frame: CancelFrame):
|
||||
self._speech_recognizer.stop_continuous_recognition_async()
|
||||
self._push_frame_task.cancel()
|
||||
await self._push_frame_task
|
||||
|
||||
async def _handle_interruptions(self, frame: Frame):
|
||||
# Cancel the task. This will stop pushing frames downstream.
|
||||
self._push_frame_task.cancel()
|
||||
await self._push_frame_task
|
||||
# Push an out-of-band frame (i.e. not using the ordered push
|
||||
# frame task).
|
||||
await self.push_frame(frame)
|
||||
# Create a new queue and task.
|
||||
self._create_push_task()
|
||||
|
||||
def _create_push_task(self):
|
||||
self._push_queue = asyncio.Queue()
|
||||
self._push_frame_task = self.get_event_loop().create_task(self._push_frame_task_handler())
|
||||
|
||||
async def _push_frame_task_handler(self):
|
||||
running = True
|
||||
while running:
|
||||
try:
|
||||
(frame, direction) = await self._push_queue.get()
|
||||
await self.push_frame(frame, direction)
|
||||
running = not isinstance(frame, EndFrame)
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
self._audio_stream.close()
|
||||
|
||||
def _on_handle_recognized(self, event):
|
||||
if event.result.reason == ResultReason.RecognizedSpeech and len(event.result.text) > 0:
|
||||
direction = FrameDirection.DOWNSTREAM
|
||||
frame = TranscriptionFrame(event.result.text, "", int(time.time_ns() / 1000000))
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
self._push_queue.put((frame, direction)), self.get_event_loop())
|
||||
asyncio.run_coroutine_threadsafe(self.queue_frame(frame), self.get_event_loop())
|
||||
|
||||
|
||||
class AzureImageGenServiceREST(ImageGenService):
|
||||
|
||||
@@ -4,15 +4,37 @@
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
from cartesia import AsyncCartesia
|
||||
import json
|
||||
import uuid
|
||||
import base64
|
||||
import asyncio
|
||||
import time
|
||||
|
||||
from typing import AsyncGenerator
|
||||
|
||||
from pipecat.frames.frames import AudioRawFrame, Frame
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.frames.frames import (
|
||||
Frame,
|
||||
AudioRawFrame,
|
||||
StartInterruptionFrame,
|
||||
StartFrame,
|
||||
EndFrame,
|
||||
TextFrame,
|
||||
LLMFullResponseEndFrame
|
||||
)
|
||||
from pipecat.services.ai_services import TTSService
|
||||
|
||||
from loguru import logger
|
||||
|
||||
# See .env.example for Cartesia configuration needed
|
||||
try:
|
||||
import websockets
|
||||
except ModuleNotFoundError as e:
|
||||
logger.error(f"Exception: {e}")
|
||||
logger.error(
|
||||
"In order to use Cartesia, you need to `pip install pipecat-ai[cartesia]`. Also, set `CARTESIA_API_KEY` environment variable.")
|
||||
raise Exception(f"Missing module: {e}")
|
||||
|
||||
|
||||
class CartesiaTTSService(TTSService):
|
||||
|
||||
@@ -21,45 +43,183 @@ class CartesiaTTSService(TTSService):
|
||||
*,
|
||||
api_key: str,
|
||||
voice_id: str,
|
||||
cartesia_version: str = "2024-06-10",
|
||||
url: str = "wss://api.cartesia.ai/tts/websocket",
|
||||
model_id: str = "sonic-english",
|
||||
encoding: str = "pcm_s16le",
|
||||
sample_rate: int = 16000,
|
||||
language: str = "en",
|
||||
**kwargs):
|
||||
super().__init__(**kwargs)
|
||||
|
||||
# Aggregating sentences still gives cleaner-sounding results and fewer
|
||||
# artifacts than streaming one word at a time. On average, waiting for
|
||||
# a full sentence should only "cost" us 15ms or so with GPT-4o or a Llama 3
|
||||
# model, and it's worth it for the better audio quality.
|
||||
self._aggregate_sentences = True
|
||||
|
||||
# we don't want to automatically push LLM response text frames, because the
|
||||
# context aggregators will add them to the LLM context even if we're
|
||||
# interrupted. cartesia gives us word-by-word timestamps. we can use those
|
||||
# to generate text frames ourselves aligned with the playout timing of the audio!
|
||||
self._push_text_frames = False
|
||||
|
||||
self._api_key = api_key
|
||||
self._cartesia_version = cartesia_version
|
||||
self._url = url
|
||||
self._voice_id = voice_id
|
||||
self._model_id = model_id
|
||||
self._output_format = {
|
||||
"container": "raw",
|
||||
"encoding": encoding,
|
||||
"sample_rate": sample_rate,
|
||||
}
|
||||
self._language = language
|
||||
|
||||
try:
|
||||
self._client = AsyncCartesia(api_key=self._api_key)
|
||||
self._voice = self._client.voices.get(id=voice_id)
|
||||
except Exception as e:
|
||||
logger.error(f"{self} initialization error: {e}")
|
||||
self._websocket = None
|
||||
self._context_id = None
|
||||
self._context_id_start_timestamp = None
|
||||
self._timestamped_words_buffer = []
|
||||
self._receive_task = None
|
||||
self._context_appending_task = None
|
||||
|
||||
def can_generate_metrics(self) -> bool:
|
||||
return True
|
||||
|
||||
async def set_voice(self, voice: str):
|
||||
logger.debug(f"Switching TTS voice to: [{voice}]")
|
||||
self._voice_id = voice
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
await self._connect()
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
await super().stop(frame)
|
||||
await self._disconnect()
|
||||
|
||||
async def _connect(self):
|
||||
try:
|
||||
self._websocket = await websockets.connect(
|
||||
f"{self._url}?api_key={self._api_key}&cartesia_version={self._cartesia_version}"
|
||||
)
|
||||
self._receive_task = self.get_event_loop().create_task(self._receive_task_handler())
|
||||
self._context_appending_task = self.get_event_loop().create_task(self._context_appending_task_handler())
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} initialization error: {e}")
|
||||
self._websocket = None
|
||||
|
||||
async def _disconnect(self):
|
||||
try:
|
||||
if self._context_appending_task:
|
||||
self._context_appending_task.cancel()
|
||||
await self._context_appending_task
|
||||
self._context_appending_task = None
|
||||
if self._receive_task:
|
||||
self._receive_task.cancel()
|
||||
await self._receive_task
|
||||
self._receive_task = None
|
||||
if self._websocket:
|
||||
ws = self._websocket
|
||||
self._websocket = None
|
||||
await ws.close()
|
||||
self._context_id = None
|
||||
self._context_id_start_timestamp = None
|
||||
self._timestamped_words_buffer = []
|
||||
await self.stop_all_metrics()
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} error closing websocket: {e}")
|
||||
|
||||
async def _handle_interruption(self, frame: StartInterruptionFrame, direction: FrameDirection):
|
||||
await super()._handle_interruption(frame, direction)
|
||||
self._context_id = None
|
||||
self._context_id_start_timestamp = None
|
||||
self._timestamped_words_buffer = []
|
||||
await self.stop_all_metrics()
|
||||
await self.push_frame(LLMFullResponseEndFrame())
|
||||
|
||||
async def _receive_task_handler(self):
|
||||
try:
|
||||
async for message in self._websocket:
|
||||
msg = json.loads(message)
|
||||
# logger.debug(f"Received message: {msg['type']} {msg['context_id']}")
|
||||
if not msg or msg["context_id"] != self._context_id:
|
||||
continue
|
||||
if msg["type"] == "done":
|
||||
await self.stop_ttfb_metrics()
|
||||
# unset _context_id but not the _context_id_start_timestamp because we are likely still
|
||||
# playing out audio and need the timestamp to set send context frames
|
||||
self._context_id = None
|
||||
self._timestamped_words_buffer.append(("LLMFullResponseEndFrame", 0))
|
||||
elif msg["type"] == "timestamps":
|
||||
# logger.debug(f"TIMESTAMPS: {msg}")
|
||||
self._timestamped_words_buffer.extend(
|
||||
list(zip(msg["word_timestamps"]["words"], msg["word_timestamps"]["end"]))
|
||||
)
|
||||
elif msg["type"] == "chunk":
|
||||
await self.stop_ttfb_metrics()
|
||||
if not self._context_id_start_timestamp:
|
||||
self._context_id_start_timestamp = time.time()
|
||||
frame = AudioRawFrame(
|
||||
audio=base64.b64decode(msg["data"]),
|
||||
sample_rate=self._output_format["sample_rate"],
|
||||
num_channels=1
|
||||
)
|
||||
await self.push_frame(frame)
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} exception: {e}")
|
||||
|
||||
async def _context_appending_task_handler(self):
|
||||
try:
|
||||
while True:
|
||||
await asyncio.sleep(0.1)
|
||||
if not self._context_id_start_timestamp:
|
||||
continue
|
||||
elapsed_seconds = time.time() - self._context_id_start_timestamp
|
||||
# pop all words from self._timestamped_words_buffer that are older than the
|
||||
# elapsed time and print a message about them to the console
|
||||
while self._timestamped_words_buffer and self._timestamped_words_buffer[0][1] <= elapsed_seconds:
|
||||
word, timestamp = self._timestamped_words_buffer.pop(0)
|
||||
if word == "LLMFullResponseEndFrame" and timestamp == 0:
|
||||
await self.push_frame(LLMFullResponseEndFrame())
|
||||
continue
|
||||
# print(f"Word '{word}' with timestamp {timestamp:.2f}s has been spoken.")
|
||||
await self.push_frame(TextFrame(word))
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} exception: {e}")
|
||||
|
||||
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
|
||||
logger.debug(f"Generating TTS: [{text}]")
|
||||
|
||||
try:
|
||||
await self.start_ttfb_metrics()
|
||||
if not self._websocket:
|
||||
await self._connect()
|
||||
|
||||
chunk_generator = await self._client.tts.sse(
|
||||
stream=True,
|
||||
transcript=text,
|
||||
voice_embedding=self._voice["embedding"],
|
||||
model_id=self._model_id,
|
||||
output_format=self._output_format,
|
||||
)
|
||||
if not self._context_id:
|
||||
await self.start_ttfb_metrics()
|
||||
self._context_id = str(uuid.uuid4())
|
||||
|
||||
async for chunk in chunk_generator:
|
||||
await self.stop_ttfb_metrics()
|
||||
yield AudioRawFrame(chunk["audio"], self._output_format["sample_rate"], 1)
|
||||
msg = {
|
||||
"transcript": text + " ",
|
||||
"continue": True,
|
||||
"context_id": self._context_id,
|
||||
"model_id": self._model_id,
|
||||
"voice": {
|
||||
"mode": "id",
|
||||
"id": self._voice_id
|
||||
},
|
||||
"output_format": self._output_format,
|
||||
"language": self._language,
|
||||
"add_timestamps": True,
|
||||
}
|
||||
# logger.debug(f"SENDING MESSAGE {json.dumps(msg)}")
|
||||
try:
|
||||
await self._websocket.send(json.dumps(msg))
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} error sending message: {e}")
|
||||
await self._disconnect()
|
||||
await self._connect()
|
||||
return
|
||||
yield None
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
logger.exception(f"{self} exception: {e}")
|
||||
|
||||
@@ -5,8 +5,6 @@
|
||||
#
|
||||
|
||||
import aiohttp
|
||||
import asyncio
|
||||
import time
|
||||
|
||||
from typing import AsyncGenerator
|
||||
|
||||
@@ -18,14 +16,15 @@ from pipecat.frames.frames import (
|
||||
Frame,
|
||||
InterimTranscriptionFrame,
|
||||
StartFrame,
|
||||
StartInterruptionFrame,
|
||||
SystemFrame,
|
||||
TranscriptionFrame)
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.services.ai_services import AIService, TTSService
|
||||
from pipecat.services.ai_services import AsyncAIService, TTSService
|
||||
from pipecat.utils.time import time_now_iso8601
|
||||
|
||||
from loguru import logger
|
||||
|
||||
|
||||
# See .env.example for Deepgram configuration needed
|
||||
try:
|
||||
from deepgram import (
|
||||
@@ -61,6 +60,10 @@ class DeepgramTTSService(TTSService):
|
||||
def can_generate_metrics(self) -> bool:
|
||||
return True
|
||||
|
||||
async def set_voice(self, voice: str):
|
||||
logger.debug(f"Switching TTS voice to: [{voice}]")
|
||||
self._voice = voice
|
||||
|
||||
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
|
||||
logger.debug(f"Generating TTS: [{text}]")
|
||||
|
||||
@@ -91,11 +94,12 @@ class DeepgramTTSService(TTSService):
|
||||
frame = AudioRawFrame(audio=data, sample_rate=16000, num_channels=1)
|
||||
yield frame
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
logger.exception(f"{self} exception: {e}")
|
||||
|
||||
|
||||
class DeepgramSTTService(AIService):
|
||||
class DeepgramSTTService(AsyncAIService):
|
||||
def __init__(self,
|
||||
*,
|
||||
api_key: str,
|
||||
url: str = "",
|
||||
live_options: LiveOptions = LiveOptions(
|
||||
@@ -117,19 +121,15 @@ class DeepgramSTTService(AIService):
|
||||
self._connection = self._client.listen.asynclive.v("1")
|
||||
self._connection.on(LiveTranscriptionEvents.Transcript, self._on_message)
|
||||
|
||||
self._create_push_task()
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, StartInterruptionFrame):
|
||||
await self._handle_interruptions(frame)
|
||||
elif isinstance(frame, SystemFrame):
|
||||
if isinstance(frame, SystemFrame):
|
||||
await self.push_frame(frame, direction)
|
||||
elif isinstance(frame, AudioRawFrame):
|
||||
await self._connection.send(frame.audio)
|
||||
else:
|
||||
await self._push_queue.put((frame, direction))
|
||||
await self.queue_frame(frame, direction)
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
if await self._connection.start(self._live_options):
|
||||
@@ -139,37 +139,9 @@ class DeepgramSTTService(AIService):
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
await self._connection.finish()
|
||||
await self._push_queue.put((frame, FrameDirection.DOWNSTREAM))
|
||||
await self._push_frame_task
|
||||
|
||||
async def cancel(self, frame: CancelFrame):
|
||||
await self._connection.finish()
|
||||
self._push_frame_task.cancel()
|
||||
await self._push_frame_task
|
||||
|
||||
async def _handle_interruptions(self, frame: Frame):
|
||||
# Cancel the task. This will stop pushing frames downstream.
|
||||
self._push_frame_task.cancel()
|
||||
await self._push_frame_task
|
||||
# Push an out-of-band frame (i.e. not using the ordered push
|
||||
# frame task).
|
||||
await self.push_frame(frame)
|
||||
# Create a new queue and task.
|
||||
self._create_push_task()
|
||||
|
||||
def _create_push_task(self):
|
||||
self._push_queue = asyncio.Queue()
|
||||
self._push_frame_task = self.get_event_loop().create_task(self._push_frame_task_handler())
|
||||
|
||||
async def _push_frame_task_handler(self):
|
||||
running = True
|
||||
while running:
|
||||
try:
|
||||
(frame, direction) = await self._push_queue.get()
|
||||
await self.push_frame(frame, direction)
|
||||
running = not isinstance(frame, EndFrame)
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
|
||||
async def _on_message(self, *args, **kwargs):
|
||||
result = kwargs["result"]
|
||||
@@ -177,6 +149,6 @@ class DeepgramSTTService(AIService):
|
||||
transcript = result.channel.alternatives[0].transcript
|
||||
if len(transcript) > 0:
|
||||
if is_final:
|
||||
await self._push_queue.put((TranscriptionFrame(transcript, "", int(time.time_ns() / 1000000)), FrameDirection.DOWNSTREAM))
|
||||
await self.queue_frame(TranscriptionFrame(transcript, "", time_now_iso8601()))
|
||||
else:
|
||||
await self._push_queue.put((InterimTranscriptionFrame(transcript, "", int(time.time_ns() / 1000000)), FrameDirection.DOWNSTREAM))
|
||||
await self.queue_frame(InterimTranscriptionFrame(transcript, "", time_now_iso8601()))
|
||||
|
||||
@@ -34,6 +34,10 @@ class ElevenLabsTTSService(TTSService):
|
||||
def can_generate_metrics(self) -> bool:
|
||||
return True
|
||||
|
||||
async def set_voice(self, voice: str):
|
||||
logger.debug(f"Switching TTS voice to: [{voice}]")
|
||||
self._voice_id = voice
|
||||
|
||||
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
|
||||
logger.debug(f"Generating TTS: [{text}]")
|
||||
|
||||
|
||||
@@ -56,7 +56,7 @@ class FalImageGenService(ImageGenService):
|
||||
|
||||
response = await fal_client.run_async(
|
||||
self._model,
|
||||
arguments={"prompt": prompt, **self._params.model_dump()}
|
||||
arguments={"prompt": prompt, **self._params.model_dump(exclude_none=True)}
|
||||
)
|
||||
|
||||
image_url = response["images"][0]["url"] if response else None
|
||||
|
||||
@@ -19,6 +19,7 @@ except ModuleNotFoundError as e:
|
||||
|
||||
class FireworksLLMService(BaseOpenAILLMService):
|
||||
def __init__(self,
|
||||
*,
|
||||
model: str = "accounts/fireworks/models/firefunction-v1",
|
||||
base_url: str = "https://api.fireworks.ai/inference/v1"):
|
||||
super().__init__(model, base_url)
|
||||
|
||||
115
src/pipecat/services/gladia.py
Normal file
115
src/pipecat/services/gladia.py
Normal file
@@ -0,0 +1,115 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import base64
|
||||
import json
|
||||
|
||||
from typing import Optional
|
||||
from pydantic.main import BaseModel
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
AudioRawFrame,
|
||||
CancelFrame,
|
||||
EndFrame,
|
||||
Frame,
|
||||
InterimTranscriptionFrame,
|
||||
StartFrame,
|
||||
SystemFrame,
|
||||
TranscriptionFrame)
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.services.ai_services import AsyncAIService
|
||||
from pipecat.utils.time import time_now_iso8601
|
||||
|
||||
from loguru import logger
|
||||
|
||||
# See .env.example for Gladia configuration needed
|
||||
try:
|
||||
import websockets
|
||||
except ModuleNotFoundError as e:
|
||||
logger.error(f"Exception: {e}")
|
||||
logger.error(
|
||||
"In order to use Gladia, you need to `pip install pipecat-ai[gladia]`. Also, set `GLADIA_API_KEY` environment variable.")
|
||||
raise Exception(f"Missing module: {e}")
|
||||
|
||||
|
||||
class GladiaSTTService(AsyncAIService):
|
||||
class InputParams(BaseModel):
|
||||
sample_rate: Optional[int] = 16000
|
||||
language: Optional[str] = "english"
|
||||
transcription_hint: Optional[str] = None
|
||||
endpointing: Optional[int] = 200
|
||||
prosody: Optional[bool] = None
|
||||
|
||||
def __init__(self,
|
||||
*,
|
||||
api_key: str,
|
||||
url: str = "wss://api.gladia.io/audio/text/audio-transcription",
|
||||
confidence: float = 0.5,
|
||||
params: InputParams = InputParams(),
|
||||
**kwargs):
|
||||
super().__init__(**kwargs)
|
||||
|
||||
self._api_key = api_key
|
||||
self._url = url
|
||||
self._params = params
|
||||
self._confidence = confidence
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, SystemFrame):
|
||||
await self.push_frame(frame, direction)
|
||||
elif isinstance(frame, AudioRawFrame):
|
||||
await self._send_audio(frame)
|
||||
else:
|
||||
await self.queue_frame(frame, direction)
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
self._websocket = await websockets.connect(self._url)
|
||||
self._receive_task = self.get_event_loop().create_task(self._receive_task_handler())
|
||||
await self._setup_gladia()
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
await self._websocket.close()
|
||||
|
||||
async def cancel(self, frame: CancelFrame):
|
||||
await self._websocket.close()
|
||||
|
||||
async def _setup_gladia(self):
|
||||
configuration = {
|
||||
"x_gladia_key": self._api_key,
|
||||
"encoding": "WAV/PCM",
|
||||
"model_type": "fast",
|
||||
"language_behaviour": "manual",
|
||||
**self._params.model_dump(exclude_none=True)
|
||||
}
|
||||
|
||||
await self._websocket.send(json.dumps(configuration))
|
||||
|
||||
async def _send_audio(self, frame: AudioRawFrame):
|
||||
message = {
|
||||
'frames': base64.b64encode(frame.audio).decode("utf-8")
|
||||
}
|
||||
await self._websocket.send(json.dumps(message))
|
||||
|
||||
async def _receive_task_handler(self):
|
||||
async for message in self._websocket:
|
||||
utterance = json.loads(message)
|
||||
if not utterance:
|
||||
continue
|
||||
|
||||
if "error" in utterance:
|
||||
message = utterance["message"]
|
||||
logger.error(f"Gladia error: {message}")
|
||||
elif "confidence" in utterance:
|
||||
type = utterance["type"]
|
||||
confidence = utterance["confidence"]
|
||||
transcript = utterance["transcription"]
|
||||
if confidence >= self._confidence:
|
||||
if type == "final":
|
||||
await self.queue_frame(TranscriptionFrame(transcript, "", time_now_iso8601()))
|
||||
else:
|
||||
await self.queue_frame(InterimTranscriptionFrame(transcript, "", time_now_iso8601()))
|
||||
@@ -10,12 +10,11 @@ from typing import List
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
Frame,
|
||||
LLMModelUpdateFrame,
|
||||
TextFrame,
|
||||
VisionImageRawFrame,
|
||||
LLMMessagesFrame,
|
||||
LLMFullResponseStartFrame,
|
||||
LLMResponseStartFrame,
|
||||
LLMResponseEndFrame,
|
||||
LLMFullResponseEndFrame
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
@@ -42,14 +41,17 @@ class GoogleLLMService(LLMService):
|
||||
franca for all LLM services, so that it is easy to switch between different LLMs.
|
||||
"""
|
||||
|
||||
def __init__(self, api_key: str, model: str = "gemini-1.5-flash-latest", **kwargs):
|
||||
def __init__(self, *, api_key: str, model: str = "gemini-1.5-flash-latest", **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
gai.configure(api_key=api_key)
|
||||
self._client = gai.GenerativeModel(model)
|
||||
self._create_client(model)
|
||||
|
||||
def can_generate_metrics(self) -> bool:
|
||||
return True
|
||||
|
||||
def _create_client(self, model: str):
|
||||
self._client = gai.GenerativeModel(model)
|
||||
|
||||
def _get_messages_from_openai_context(
|
||||
self, context: OpenAILLMContext) -> List[glm.Content]:
|
||||
openai_messages = context.get_messages()
|
||||
@@ -95,19 +97,17 @@ class GoogleLLMService(LLMService):
|
||||
async for chunk in self._async_generator_wrapper(response):
|
||||
try:
|
||||
text = chunk.text
|
||||
await self.push_frame(LLMResponseStartFrame())
|
||||
await self.push_frame(TextFrame(text))
|
||||
await self.push_frame(LLMResponseEndFrame())
|
||||
except Exception as e:
|
||||
# Google LLMs seem to flag safety issues a lot!
|
||||
if chunk.candidates[0].finish_reason == 3:
|
||||
logger.debug(
|
||||
f"LLM refused to generate content for safety reasons - {messages}.")
|
||||
else:
|
||||
logger.error(f"{self} error: {e}")
|
||||
logger.exception(f"{self} error: {e}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
logger.exception(f"{self} exception: {e}")
|
||||
finally:
|
||||
await self.push_frame(LLMFullResponseEndFrame())
|
||||
|
||||
@@ -122,6 +122,9 @@ class GoogleLLMService(LLMService):
|
||||
context = OpenAILLMContext.from_messages(frame.messages)
|
||||
elif isinstance(frame, VisionImageRawFrame):
|
||||
context = OpenAILLMContext.from_image_frame(frame)
|
||||
elif isinstance(frame, LLMModelUpdateFrame):
|
||||
logger.debug(f"Switching LLM model to: [{frame.model}]")
|
||||
self._create_client(frame.model)
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
|
||||
@@ -46,6 +46,7 @@ def detect_device():
|
||||
class MoondreamService(VisionService):
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
model="vikhyatk/moondream2",
|
||||
revision="2024-04-02",
|
||||
use_cpu=False
|
||||
|
||||
@@ -9,5 +9,5 @@ from pipecat.services.openai import BaseOpenAILLMService
|
||||
|
||||
class OLLamaLLMService(BaseOpenAILLMService):
|
||||
|
||||
def __init__(self, model: str = "llama2", base_url: str = "http://localhost:11434/v1"):
|
||||
def __init__(self, *, model: str = "llama2", base_url: str = "http://localhost:11434/v1"):
|
||||
super().__init__(model=model, base_url=base_url, api_key="ollama")
|
||||
|
||||
@@ -8,8 +8,9 @@ import aiohttp
|
||||
import base64
|
||||
import io
|
||||
import json
|
||||
import httpx
|
||||
|
||||
from typing import Any, AsyncGenerator, List, Literal
|
||||
from typing import AsyncGenerator, List, Literal
|
||||
|
||||
from loguru import logger
|
||||
from PIL import Image
|
||||
@@ -21,8 +22,7 @@ from pipecat.frames.frames import (
|
||||
LLMFullResponseEndFrame,
|
||||
LLMFullResponseStartFrame,
|
||||
LLMMessagesFrame,
|
||||
LLMResponseEndFrame,
|
||||
LLMResponseStartFrame,
|
||||
LLMModelUpdateFrame,
|
||||
TextFrame,
|
||||
URLImageRawFrame,
|
||||
VisionImageRawFrame
|
||||
@@ -39,7 +39,7 @@ from pipecat.services.ai_services import (
|
||||
)
|
||||
|
||||
try:
|
||||
from openai import AsyncOpenAI, AsyncStream, BadRequestError
|
||||
from openai import AsyncOpenAI, AsyncStream, DefaultAsyncHttpxClient, BadRequestError
|
||||
from openai.types.chat import (
|
||||
ChatCompletionChunk,
|
||||
ChatCompletionFunctionMessageParam,
|
||||
@@ -53,7 +53,7 @@ except ModuleNotFoundError as e:
|
||||
raise Exception(f"Missing module: {e}")
|
||||
|
||||
|
||||
class OpenAIUnhandledFunctionException(BaseException):
|
||||
class OpenAIUnhandledFunctionException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
@@ -67,13 +67,20 @@ class BaseOpenAILLMService(LLMService):
|
||||
calls from the LLM.
|
||||
"""
|
||||
|
||||
def __init__(self, model: str, api_key=None, base_url=None, **kwargs):
|
||||
def __init__(self, *, model: str, api_key=None, base_url=None, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._model: str = model
|
||||
self._client = self.create_client(api_key=api_key, base_url=base_url, **kwargs)
|
||||
|
||||
def create_client(self, api_key=None, base_url=None, **kwargs):
|
||||
return AsyncOpenAI(api_key=api_key, base_url=base_url)
|
||||
return AsyncOpenAI(
|
||||
api_key=api_key,
|
||||
base_url=base_url,
|
||||
http_client=DefaultAsyncHttpxClient(
|
||||
limits=httpx.Limits(
|
||||
max_keepalive_connections=100,
|
||||
max_connections=1000,
|
||||
keepalive_expiry=None)))
|
||||
|
||||
def can_generate_metrics(self) -> bool:
|
||||
return True
|
||||
@@ -109,10 +116,7 @@ class BaseOpenAILLMService(LLMService):
|
||||
del message["data"]
|
||||
del message["mime_type"]
|
||||
|
||||
try:
|
||||
chunks = await self.get_chat_completions(context, messages)
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
chunks = await self.get_chat_completions(context, messages)
|
||||
|
||||
return chunks
|
||||
|
||||
@@ -154,9 +158,7 @@ class BaseOpenAILLMService(LLMService):
|
||||
# Keep iterating through the response to collect all the argument fragments
|
||||
arguments += tool_call.function.arguments
|
||||
elif chunk.choices[0].delta.content:
|
||||
await self.push_frame(LLMResponseStartFrame())
|
||||
await self.push_frame(TextFrame(chunk.choices[0].delta.content))
|
||||
await self.push_frame(LLMResponseEndFrame())
|
||||
|
||||
# if we got a function name and arguments, check to see if it's a function with
|
||||
# a registered handler. If so, run the registered callback, save the result to
|
||||
@@ -214,7 +216,7 @@ class BaseOpenAILLMService(LLMService):
|
||||
elif isinstance(result, type(None)):
|
||||
pass
|
||||
else:
|
||||
raise BaseException(f"Unknown return type from function callback: {type(result)}")
|
||||
raise TypeError(f"Unknown return type from function callback: {type(result)}")
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
@@ -226,19 +228,24 @@ class BaseOpenAILLMService(LLMService):
|
||||
context = OpenAILLMContext.from_messages(frame.messages)
|
||||
elif isinstance(frame, VisionImageRawFrame):
|
||||
context = OpenAILLMContext.from_image_frame(frame)
|
||||
elif isinstance(frame, LLMModelUpdateFrame):
|
||||
logger.debug(f"Switching LLM model to: [{frame.model}]")
|
||||
self._model = frame.model
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
if context:
|
||||
await self.push_frame(LLMFullResponseStartFrame())
|
||||
await self.start_processing_metrics()
|
||||
await self._process_context(context)
|
||||
await self.stop_processing_metrics()
|
||||
await self.push_frame(LLMFullResponseEndFrame())
|
||||
|
||||
|
||||
class OpenAILLMService(BaseOpenAILLMService):
|
||||
|
||||
def __init__(self, model="gpt-4o", **kwargs):
|
||||
super().__init__(model, **kwargs)
|
||||
def __init__(self, *, model: str = "gpt-4o", **kwargs):
|
||||
super().__init__(model=model, **kwargs)
|
||||
|
||||
|
||||
class OpenAIImageGenService(ImageGenService):
|
||||
@@ -310,6 +317,10 @@ class OpenAITTSService(TTSService):
|
||||
def can_generate_metrics(self) -> bool:
|
||||
return True
|
||||
|
||||
async def set_voice(self, voice: str):
|
||||
logger.debug(f"Switching TTS voice to: [{voice}]")
|
||||
self._voice = voice
|
||||
|
||||
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
|
||||
logger.debug(f"Generating TTS: [{text}]")
|
||||
|
||||
@@ -334,4 +345,4 @@ class OpenAITTSService(TTSService):
|
||||
frame = AudioRawFrame(chunk, 24_000, 1)
|
||||
yield frame
|
||||
except BadRequestError as e:
|
||||
logger.error(f"{self} error generating TTS: {e}")
|
||||
logger.exception(f"{self} error generating TTS: {e}")
|
||||
|
||||
@@ -25,6 +25,7 @@ class OpenPipeLLMService(BaseOpenAILLMService):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
model: str = "gpt-4o",
|
||||
api_key: str | None = None,
|
||||
base_url: str | None = None,
|
||||
@@ -33,9 +34,9 @@ class OpenPipeLLMService(BaseOpenAILLMService):
|
||||
tags: Dict[str, str] | None = None,
|
||||
**kwargs):
|
||||
super().__init__(
|
||||
model,
|
||||
api_key,
|
||||
base_url,
|
||||
model=model,
|
||||
api_key=api_key,
|
||||
base_url=base_url,
|
||||
openpipe_api_key=openpipe_api_key,
|
||||
openpipe_base_url=openpipe_base_url,
|
||||
**kwargs)
|
||||
|
||||
@@ -80,4 +80,4 @@ class PlayHTTTSService(TTSService):
|
||||
frame = AudioRawFrame(chunk, 16000, 1)
|
||||
yield frame
|
||||
except Exception as e:
|
||||
logger.error(f"{self} error generating TTS: {e}")
|
||||
logger.exception(f"{self} error generating TTS: {e}")
|
||||
|
||||
@@ -7,7 +7,6 @@
|
||||
"""This module implements Whisper transcription with a locally-downloaded model."""
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
|
||||
from enum import Enum
|
||||
from typing_extensions import AsyncGenerator
|
||||
@@ -16,6 +15,7 @@ import numpy as np
|
||||
|
||||
from pipecat.frames.frames import ErrorFrame, Frame, TranscriptionFrame
|
||||
from pipecat.services.ai_services import STTService
|
||||
from pipecat.utils.time import time_now_iso8601
|
||||
|
||||
from loguru import logger
|
||||
|
||||
@@ -42,7 +42,8 @@ class WhisperSTTService(STTService):
|
||||
"""Class to transcribe audio with a locally-downloaded Whisper model"""
|
||||
|
||||
def __init__(self,
|
||||
model: Model = Model.DISTIL_MEDIUM_EN,
|
||||
*,
|
||||
model: str | Model = Model.DISTIL_MEDIUM_EN,
|
||||
device: str = "auto",
|
||||
compute_type: str = "default",
|
||||
no_speech_prob: float = 0.4,
|
||||
@@ -51,7 +52,7 @@ class WhisperSTTService(STTService):
|
||||
super().__init__(**kwargs)
|
||||
self._device: str = device
|
||||
self._compute_type = compute_type
|
||||
self._model_name: Model = model
|
||||
self._model_name: str | Model = model
|
||||
self._no_speech_prob = no_speech_prob
|
||||
self._model: WhisperModel | None = None
|
||||
self._load()
|
||||
@@ -64,7 +65,7 @@ class WhisperSTTService(STTService):
|
||||
this model is being run, it will take time to download."""
|
||||
logger.debug("Loading Whisper model...")
|
||||
self._model = WhisperModel(
|
||||
self._model_name.value,
|
||||
self._model_name.value if isinstance(self._model_name, Enum) else self._model_name,
|
||||
device=self._device,
|
||||
compute_type=self._compute_type)
|
||||
logger.debug("Loaded Whisper model")
|
||||
@@ -90,4 +91,4 @@ class WhisperSTTService(STTService):
|
||||
if text:
|
||||
await self.stop_ttfb_metrics()
|
||||
logger.debug(f"Transcription: [{text}]")
|
||||
yield TranscriptionFrame(text, "", int(time.time_ns() / 1000000))
|
||||
yield TranscriptionFrame(text, "", time_now_iso8601())
|
||||
|
||||
116
src/pipecat/services/xtts.py
Normal file
116
src/pipecat/services/xtts.py
Normal file
@@ -0,0 +1,116 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import aiohttp
|
||||
|
||||
from typing import AsyncGenerator
|
||||
|
||||
from pipecat.frames.frames import AudioRawFrame, ErrorFrame, Frame
|
||||
from pipecat.services.ai_services import TTSService
|
||||
|
||||
from loguru import logger
|
||||
|
||||
import requests
|
||||
|
||||
import numpy as np
|
||||
|
||||
try:
|
||||
import resampy
|
||||
except ModuleNotFoundError as e:
|
||||
logger.error(f"Exception: {e}")
|
||||
logger.error("In order to use XTTS, you need to `pip install pipecat-ai[xtts]`.")
|
||||
raise Exception(f"Missing module: {e}")
|
||||
|
||||
|
||||
# The server below can connect to XTTS through a local running docker
|
||||
#
|
||||
# Docker command: $ docker run --gpus=all -e COQUI_TOS_AGREED=1 --rm -p 8000:80 ghcr.io/coqui-ai/xtts-streaming-server:latest-cuda121
|
||||
#
|
||||
# You can find more information on the official repo:
|
||||
# https://github.com/coqui-ai/xtts-streaming-server
|
||||
|
||||
|
||||
class XTTSService(TTSService):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
aiohttp_session: aiohttp.ClientSession,
|
||||
voice_id: str,
|
||||
language: str,
|
||||
base_url: str,
|
||||
**kwargs):
|
||||
super().__init__(**kwargs)
|
||||
|
||||
self._voice_id = voice_id
|
||||
self._language = language
|
||||
self._base_url = base_url
|
||||
self._aiohttp_session = aiohttp_session
|
||||
self._studio_speakers = requests.get(self._base_url + "/studio_speakers").json()
|
||||
|
||||
def can_generate_metrics(self) -> bool:
|
||||
return True
|
||||
|
||||
async def set_voice(self, voice: str):
|
||||
logger.debug(f"Switching TTS voice to: [{voice}]")
|
||||
self._voice_id = voice
|
||||
|
||||
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
|
||||
logger.debug(f"Generating TTS: [{text}]")
|
||||
embeddings = self._studio_speakers[self._voice_id]
|
||||
|
||||
url = self._base_url + "/tts_stream"
|
||||
|
||||
payload = {
|
||||
"text": text.replace('.', '').replace('*', ''),
|
||||
"language": self._language,
|
||||
"speaker_embedding": embeddings["speaker_embedding"],
|
||||
"gpt_cond_latent": embeddings["gpt_cond_latent"],
|
||||
"add_wav_header": False,
|
||||
"stream_chunk_size": 20,
|
||||
}
|
||||
|
||||
await self.start_ttfb_metrics()
|
||||
|
||||
async with self._aiohttp_session.post(url, json=payload) as r:
|
||||
if r.status != 200:
|
||||
text = await r.text()
|
||||
logger.error(f"{self} error getting audio (status: {r.status}, error: {text})")
|
||||
yield ErrorFrame(f"Error getting audio (status: {r.status}, error: {text})")
|
||||
return
|
||||
|
||||
buffer = bytearray()
|
||||
|
||||
async for chunk in r.content.iter_chunked(1024):
|
||||
if len(chunk) > 0:
|
||||
await self.stop_ttfb_metrics()
|
||||
# Append new chunk to the buffer
|
||||
buffer.extend(chunk)
|
||||
|
||||
# Check if buffer has enough data for processing
|
||||
while len(buffer) >= 48000: # Assuming at least 0.5 seconds of audio data at 24000 Hz
|
||||
# Process the buffer up to a safe size for resampling
|
||||
process_data = buffer[:48000]
|
||||
# Remove processed data from buffer
|
||||
buffer = buffer[48000:]
|
||||
|
||||
# Convert the byte data to numpy array for resampling
|
||||
audio_np = np.frombuffer(process_data, dtype=np.int16)
|
||||
# Resample the audio from 24000 Hz to 16000 Hz
|
||||
resampled_audio = resampy.resample(audio_np, 24000, 16000)
|
||||
# Convert the numpy array back to bytes
|
||||
resampled_audio_bytes = resampled_audio.astype(np.int16).tobytes()
|
||||
# Create the frame with the resampled audio
|
||||
frame = AudioRawFrame(resampled_audio_bytes, 16000, 1)
|
||||
yield frame
|
||||
|
||||
# Process any remaining data in the buffer
|
||||
if len(buffer) > 0:
|
||||
audio_np = np.frombuffer(buffer, dtype=np.int16)
|
||||
resampled_audio = resampy.resample(audio_np, 24000, 16000)
|
||||
resampled_audio_bytes = resampled_audio.astype(np.int16).tobytes()
|
||||
frame = AudioRawFrame(resampled_audio_bytes, 16000, 1)
|
||||
yield frame
|
||||
@@ -11,6 +11,7 @@ from concurrent.futures import ThreadPoolExecutor
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.frames.frames import (
|
||||
AudioRawFrame,
|
||||
BotInterruptionFrame,
|
||||
CancelFrame,
|
||||
StartFrame,
|
||||
EndFrame,
|
||||
@@ -78,6 +79,8 @@ class BaseInputTransport(FrameProcessor):
|
||||
elif isinstance(frame, EndFrame):
|
||||
await self._internal_push_frame(frame, direction)
|
||||
await self.stop()
|
||||
elif isinstance(frame, BotInterruptionFrame):
|
||||
await self._handle_interruptions(frame, False)
|
||||
else:
|
||||
await self._internal_push_frame(frame, direction)
|
||||
|
||||
@@ -101,6 +104,7 @@ class BaseInputTransport(FrameProcessor):
|
||||
try:
|
||||
(frame, direction) = await self._push_queue.get()
|
||||
await self.push_frame(frame, direction)
|
||||
self._push_queue.task_done()
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
|
||||
@@ -108,24 +112,35 @@ class BaseInputTransport(FrameProcessor):
|
||||
# Handle interruptions
|
||||
#
|
||||
|
||||
async def _handle_interruptions(self, frame: Frame):
|
||||
async def _start_interruption(self):
|
||||
# Cancel the task. This will stop pushing frames downstream.
|
||||
self._push_frame_task.cancel()
|
||||
await self._push_frame_task
|
||||
# Push an out-of-band frame (i.e. not using the ordered push
|
||||
# frame task) to stop everything, specially at the output
|
||||
# transport.
|
||||
await self.push_frame(StartInterruptionFrame())
|
||||
# Create a new queue and task.
|
||||
self._create_push_task()
|
||||
|
||||
async def _stop_interruption(self):
|
||||
await self.push_frame(StopInterruptionFrame())
|
||||
|
||||
async def _handle_interruptions(self, frame: Frame, push_frame: bool):
|
||||
if self.interruptions_allowed:
|
||||
# Make sure we notify about interruptions quickly out-of-band
|
||||
if isinstance(frame, UserStartedSpeakingFrame):
|
||||
if isinstance(frame, BotInterruptionFrame):
|
||||
logger.debug("Bot interruption")
|
||||
await self._start_interruption()
|
||||
elif isinstance(frame, UserStartedSpeakingFrame):
|
||||
logger.debug("User started speaking")
|
||||
# Cancel the task. This will stop pushing frames downstream.
|
||||
self._push_frame_task.cancel()
|
||||
await self._push_frame_task
|
||||
# Push an out-of-band frame (i.e. not using the ordered push
|
||||
# frame task) to stop everything, specially at the output
|
||||
# transport.
|
||||
await self.push_frame(StartInterruptionFrame())
|
||||
# Create a new queue and task.
|
||||
self._create_push_task()
|
||||
await self._start_interruption()
|
||||
elif isinstance(frame, UserStoppedSpeakingFrame):
|
||||
logger.debug("User stopped speaking")
|
||||
await self.push_frame(StopInterruptionFrame())
|
||||
await self._internal_push_frame(frame)
|
||||
await self._stop_interruption()
|
||||
|
||||
if push_frame:
|
||||
await self._internal_push_frame(frame)
|
||||
|
||||
#
|
||||
# Audio input
|
||||
@@ -149,7 +164,7 @@ class BaseInputTransport(FrameProcessor):
|
||||
frame = UserStoppedSpeakingFrame()
|
||||
|
||||
if frame:
|
||||
await self._handle_interruptions(frame)
|
||||
await self._handle_interruptions(frame, True)
|
||||
|
||||
vad_state = new_vad_state
|
||||
return vad_state
|
||||
@@ -171,7 +186,9 @@ class BaseInputTransport(FrameProcessor):
|
||||
# Push audio downstream if passthrough.
|
||||
if audio_passthrough:
|
||||
await self._internal_push_frame(frame)
|
||||
|
||||
self._audio_in_queue.task_done()
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except BaseException as e:
|
||||
logger.error(f"{self} error reading audio frames: {e}")
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} error reading audio frames: {e}")
|
||||
|
||||
@@ -14,6 +14,7 @@ from typing import List
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.frames.frames import (
|
||||
AudioRawFrame,
|
||||
BotSpeakingFrame,
|
||||
CancelFrame,
|
||||
MetricsFrame,
|
||||
SpriteFrame,
|
||||
@@ -180,8 +181,8 @@ class BaseOutputTransport(FrameProcessor):
|
||||
self._sink_queue.task_done()
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except BaseException as e:
|
||||
logger.error(f"{self} error processing sink queue: {e}")
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} error processing sink queue: {e}")
|
||||
|
||||
#
|
||||
# Push frames task
|
||||
@@ -203,6 +204,7 @@ class BaseOutputTransport(FrameProcessor):
|
||||
try:
|
||||
(frame, direction) = await self._push_queue.get()
|
||||
await self.push_frame(frame, direction)
|
||||
self._push_queue.task_done()
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
|
||||
@@ -250,7 +252,7 @@ class BaseOutputTransport(FrameProcessor):
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"{self} error writing to camera: {e}")
|
||||
logger.exception(f"{self} error writing to camera: {e}")
|
||||
|
||||
#
|
||||
# Audio out
|
||||
@@ -263,4 +265,5 @@ class BaseOutputTransport(FrameProcessor):
|
||||
if len(buffer) >= self._audio_chunk_size:
|
||||
await self.write_raw_audio_frames(bytes(buffer[:self._audio_chunk_size]))
|
||||
buffer = buffer[self._audio_chunk_size:]
|
||||
await self.push_frame(BotSpeakingFrame(), FrameDirection.UPSTREAM)
|
||||
return buffer
|
||||
|
||||
@@ -82,5 +82,4 @@ class BaseTransport(ABC):
|
||||
else:
|
||||
handler(self, *args, **kwargs)
|
||||
except Exception as e:
|
||||
logger.error(f"Exception in event handler {event_name}: {e}")
|
||||
raise e
|
||||
logger.exception(f"Exception in event handler {event_name}: {e}")
|
||||
|
||||
@@ -12,7 +12,6 @@ import wave
|
||||
from typing import Awaitable, Callable
|
||||
from pydantic.main import BaseModel
|
||||
|
||||
from pipecat.serializers.twilio import TwilioFrameSerializer
|
||||
from pipecat.frames.frames import AudioRawFrame, StartFrame
|
||||
from pipecat.processors.frame_processor import FrameProcessor
|
||||
from pipecat.serializers.base_serializer import FrameSerializer
|
||||
@@ -114,7 +113,7 @@ class FastAPIWebsocketOutputTransport(BaseOutputTransport):
|
||||
frame = wav_frame
|
||||
|
||||
payload = self._params.serializer.serialize(frame)
|
||||
if payload:
|
||||
if payload and self._websocket.client_state == WebSocketState.CONNECTED:
|
||||
await self._websocket.send_text(payload)
|
||||
|
||||
self._audio_buffer = self._audio_buffer[self._params.audio_frame_size:]
|
||||
|
||||
@@ -124,6 +124,9 @@ class WebsocketServerOutputTransport(BaseOutputTransport):
|
||||
self._websocket = websocket
|
||||
|
||||
async def write_raw_audio_frames(self, frames: bytes):
|
||||
if not self._websocket:
|
||||
return
|
||||
|
||||
self._audio_buffer += frames
|
||||
while len(self._audio_buffer) >= self._params.audio_frame_size:
|
||||
frame = AudioRawFrame(
|
||||
@@ -148,8 +151,8 @@ class WebsocketServerOutputTransport(BaseOutputTransport):
|
||||
frame = wav_frame
|
||||
|
||||
proto = self._params.serializer.serialize(frame)
|
||||
|
||||
await self._websocket.send(proto)
|
||||
if proto:
|
||||
await self._websocket.send(proto)
|
||||
|
||||
self._audio_buffer = self._audio_buffer[self._params.audio_frame_size:]
|
||||
|
||||
|
||||
@@ -9,7 +9,7 @@ import asyncio
|
||||
import time
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Awaitable, Callable, Mapping
|
||||
from typing import Any, Awaitable, Callable, Mapping, Optional
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
from daily import (
|
||||
@@ -59,8 +59,8 @@ class DailyTransportMessageFrame(TransportMessageFrame):
|
||||
|
||||
class WebRTCVADAnalyzer(VADAnalyzer):
|
||||
|
||||
def __init__(self, sample_rate=16000, num_channels=1, params: VADParams = VADParams()):
|
||||
super().__init__(sample_rate, num_channels, params)
|
||||
def __init__(self, *, sample_rate=16000, num_channels=1, params: VADParams = VADParams()):
|
||||
super().__init__(sample_rate=sample_rate, num_channels=num_channels, params=params)
|
||||
|
||||
self._webrtc_vad = Daily.create_native_vad(
|
||||
reset_period_ms=VAD_RESET_PERIOD_MS,
|
||||
@@ -101,7 +101,7 @@ class DailyTranscriptionSettings(BaseModel):
|
||||
class DailyParams(TransportParams):
|
||||
api_url: str = "https://api.daily.co/v1"
|
||||
api_key: str = ""
|
||||
dialin_settings: DailyDialinSettings | None = None
|
||||
dialin_settings: Optional[DailyDialinSettings] = None
|
||||
transcription_enabled: bool = False
|
||||
transcription_settings: DailyTranscriptionSettings = DailyTranscriptionSettings()
|
||||
|
||||
@@ -198,11 +198,18 @@ class DailyTransportClient(EventHandler):
|
||||
def set_callbacks(self, callbacks: DailyCallbacks):
|
||||
self._callbacks = callbacks
|
||||
|
||||
async def send_message(self, frame: DailyTransportMessageFrame):
|
||||
async def send_message(self, frame: TransportMessageFrame):
|
||||
if not self._client:
|
||||
return
|
||||
|
||||
participant_id = None
|
||||
if isinstance(frame, DailyTransportMessageFrame):
|
||||
participant_id = frame.participant_id
|
||||
|
||||
future = self._loop.create_future()
|
||||
self._client.send_app_message(
|
||||
frame.message,
|
||||
frame.participant_id,
|
||||
participant_id,
|
||||
completion=completion_callback(future))
|
||||
await future
|
||||
|
||||
@@ -262,10 +269,7 @@ class DailyTransportClient(EventHandler):
|
||||
logger.info(f"Joined {self._room_url}")
|
||||
|
||||
if self._token and self._params.transcription_enabled:
|
||||
logger.info(
|
||||
f"Enabling transcription with settings {self._params.transcription_settings}")
|
||||
self._client.start_transcription(
|
||||
self._params.transcription_settings.model_dump())
|
||||
await self._start_transcription()
|
||||
|
||||
await self._callbacks.on_joined(data["participants"]["local"])
|
||||
else:
|
||||
@@ -277,6 +281,17 @@ class DailyTransportClient(EventHandler):
|
||||
logger.error(error_msg)
|
||||
await self._callbacks.on_error(error_msg)
|
||||
|
||||
async def _start_transcription(self):
|
||||
future = self._loop.create_future()
|
||||
logger.info(f"Enabling transcription with settings {self._params.transcription_settings}")
|
||||
self._client.start_transcription(
|
||||
settings=self._params.transcription_settings.model_dump(exclude_none=True),
|
||||
completion=lambda error: future.set_result(error)
|
||||
)
|
||||
error = await future
|
||||
if error:
|
||||
logger.error(f"Unable to start transcription: {error}")
|
||||
|
||||
async def _join(self):
|
||||
future = self._loop.create_future()
|
||||
|
||||
@@ -336,7 +351,7 @@ class DailyTransportClient(EventHandler):
|
||||
logger.info(f"Leaving {self._room_url}")
|
||||
|
||||
if self._params.transcription_enabled:
|
||||
self._client.stop_transcription()
|
||||
await self._stop_transcription()
|
||||
|
||||
try:
|
||||
error = await self._leave()
|
||||
@@ -353,6 +368,13 @@ class DailyTransportClient(EventHandler):
|
||||
logger.error(error_msg)
|
||||
await self._callbacks.on_error(error_msg)
|
||||
|
||||
async def _stop_transcription(self):
|
||||
future = self._loop.create_future()
|
||||
self._client.stop_transcription(completion=lambda error: future.set_result(error))
|
||||
error = await future
|
||||
if error:
|
||||
logger.error(f"Unable to stop transcription: {error}")
|
||||
|
||||
async def _leave(self):
|
||||
future = self._loop.create_future()
|
||||
|
||||
@@ -652,16 +674,19 @@ class DailyOutputTransport(BaseOutputTransport):
|
||||
await super().cleanup()
|
||||
await self._client.cleanup()
|
||||
|
||||
async def send_message(self, frame: DailyTransportMessageFrame):
|
||||
async def send_message(self, frame: TransportMessageFrame):
|
||||
await self._client.send_message(frame)
|
||||
|
||||
async def send_metrics(self, frame: MetricsFrame):
|
||||
ttfb = [{"name": n, "time": t} for n, t in frame.ttfb.items()]
|
||||
metrics = {}
|
||||
if frame.ttfb:
|
||||
metrics["ttfb"] = frame.ttfb
|
||||
if frame.processing:
|
||||
metrics["processing"] = frame.processing
|
||||
|
||||
message = DailyTransportMessageFrame(message={
|
||||
"type": "pipecat-metrics",
|
||||
"metrics": {
|
||||
"ttfb": ttfb
|
||||
},
|
||||
"metrics": metrics
|
||||
})
|
||||
await self._client.send_message(message)
|
||||
|
||||
@@ -835,8 +860,8 @@ class DailyTransport(BaseTransport):
|
||||
logger.debug("Event dialin-ready was handled successfully")
|
||||
except asyncio.TimeoutError:
|
||||
logger.error(f"Timeout handling dialin-ready event ({url})")
|
||||
except BaseException as e:
|
||||
logger.error(f"Error handling dialin-ready event ({url}): {e}")
|
||||
except Exception as e:
|
||||
logger.exception(f"Error handling dialin-ready event ({url}): {e}")
|
||||
|
||||
async def _on_dialin_ready(self, sip_endpoint):
|
||||
if self._params.dialin_settings:
|
||||
|
||||
@@ -2,7 +2,7 @@ from typing import List
|
||||
from pipecat.processors.frame_processor import FrameProcessor
|
||||
|
||||
|
||||
class TestException(BaseException):
|
||||
class TestException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
|
||||
11
src/pipecat/utils/time.py
Normal file
11
src/pipecat/utils/time.py
Normal file
@@ -0,0 +1,11 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import datetime
|
||||
|
||||
|
||||
def time_now_iso8601() -> str:
|
||||
return datetime.datetime.now(datetime.timezone.utc).isoformat(timespec="milliseconds")
|
||||
@@ -33,17 +33,27 @@ _MODEL_RESET_STATES_TIME = 5.0
|
||||
|
||||
class SileroVADAnalyzer(VADAnalyzer):
|
||||
|
||||
def __init__(self, sample_rate=16000, params: VADParams = VADParams()):
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
sample_rate: int = 16000,
|
||||
version: str = "v5.0",
|
||||
force_reload: bool = False,
|
||||
skip_validation: bool = True,
|
||||
trust_repo: bool = True,
|
||||
params: VADParams = VADParams()):
|
||||
super().__init__(sample_rate=sample_rate, num_channels=1, params=params)
|
||||
|
||||
if sample_rate != 16000 and sample_rate != 8000:
|
||||
raise Exception("Silero VAD sample rate needs to be 16000 or 8000")
|
||||
raise ValueError("Silero VAD sample rate needs to be 16000 or 8000")
|
||||
|
||||
logger.debug("Loading Silero VAD model...")
|
||||
|
||||
(self._model, utils) = torch.hub.load(
|
||||
repo_or_dir="snakers4/silero-vad", model="silero_vad", force_reload=False
|
||||
)
|
||||
(self._model, _) = torch.hub.load(repo_or_dir=f"snakers4/silero-vad:{version}",
|
||||
model="silero_vad",
|
||||
force_reload=force_reload,
|
||||
skip_validation=skip_validation,
|
||||
trust_repo=trust_repo)
|
||||
|
||||
self._last_reset_time = 0
|
||||
|
||||
@@ -72,9 +82,9 @@ class SileroVADAnalyzer(VADAnalyzer):
|
||||
self._last_reset_time = curr_time
|
||||
|
||||
return new_confidence
|
||||
except BaseException as e:
|
||||
except Exception as e:
|
||||
# This comes from an empty audio array
|
||||
logger.error(f"Error analyzing audio with Silero VAD: {e}")
|
||||
logger.exception(f"Error analyzing audio with Silero VAD: {e}")
|
||||
return 0
|
||||
|
||||
|
||||
@@ -82,12 +92,23 @@ class SileroVAD(FrameProcessor):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
sample_rate: int = 16000,
|
||||
version: str = "v5.0",
|
||||
force_reload: bool = False,
|
||||
skip_validation: bool = True,
|
||||
trust_repo: bool = True,
|
||||
vad_params: VADParams = VADParams(),
|
||||
audio_passthrough: bool = False):
|
||||
super().__init__()
|
||||
|
||||
self._vad_analyzer = SileroVADAnalyzer(sample_rate=sample_rate, params=vad_params)
|
||||
self._vad_analyzer = SileroVADAnalyzer(
|
||||
sample_rate=sample_rate,
|
||||
version=version,
|
||||
force_reload=force_reload,
|
||||
skip_validation=skip_validation,
|
||||
trust_repo=trust_repo,
|
||||
params=vad_params)
|
||||
self._audio_passthrough = audio_passthrough
|
||||
|
||||
self._processor_vad_state: VADState = VADState.QUIET
|
||||
|
||||
@@ -28,7 +28,7 @@ class VADParams(BaseModel):
|
||||
|
||||
class VADAnalyzer:
|
||||
|
||||
def __init__(self, sample_rate: int, num_channels: int, params: VADParams):
|
||||
def __init__(self, *, sample_rate: int, num_channels: int, params: VADParams):
|
||||
self._sample_rate = sample_rate
|
||||
self._num_channels = num_channels
|
||||
self._params = params
|
||||
|
||||
@@ -8,8 +8,6 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.frames.frames import (
|
||||
LLMFullResponseStartFrame,
|
||||
LLMFullResponseEndFrame,
|
||||
LLMResponseEndFrame,
|
||||
LLMResponseStartFrame,
|
||||
TextFrame
|
||||
)
|
||||
from pipecat.utils.test_frame_processor import TestFrameProcessor
|
||||
@@ -64,7 +62,7 @@ if __name__ == "__main__":
|
||||
llm.register_function("get_current_weather", get_weather_from_api)
|
||||
t = TestFrameProcessor([
|
||||
LLMFullResponseStartFrame,
|
||||
[LLMResponseStartFrame, TextFrame, LLMResponseEndFrame],
|
||||
TextFrame,
|
||||
LLMFullResponseEndFrame
|
||||
])
|
||||
llm.link(t)
|
||||
@@ -98,7 +96,7 @@ if __name__ == "__main__":
|
||||
llm.register_function("get_current_weather", get_weather_from_api)
|
||||
t = TestFrameProcessor([
|
||||
LLMFullResponseStartFrame,
|
||||
[LLMResponseStartFrame, TextFrame, LLMResponseEndFrame],
|
||||
TextFrame,
|
||||
LLMFullResponseEndFrame
|
||||
])
|
||||
llm.link(t)
|
||||
@@ -121,7 +119,7 @@ if __name__ == "__main__":
|
||||
api_key = os.getenv("OPENAI_API_KEY")
|
||||
t = TestFrameProcessor([
|
||||
LLMFullResponseStartFrame,
|
||||
[LLMResponseStartFrame, TextFrame, LLMResponseEndFrame],
|
||||
TextFrame,
|
||||
LLMFullResponseEndFrame
|
||||
])
|
||||
llm = OpenAILLMService(
|
||||
|
||||
@@ -2,8 +2,8 @@ import unittest
|
||||
|
||||
from typing import AsyncGenerator
|
||||
|
||||
from pipecat.services.ai_services import AIService
|
||||
from pipecat.pipeline.frames import EndFrame, Frame, TextFrame
|
||||
from pipecat.services.ai_services import AIService, match_endofsentence
|
||||
from pipecat.frames.frames import EndFrame, Frame, TextFrame
|
||||
|
||||
|
||||
class SimpleAIService(AIService):
|
||||
@@ -27,6 +27,22 @@ class TestBaseAIService(unittest.IsolatedAsyncioTestCase):
|
||||
|
||||
self.assertEqual(input_frames, output_frames)
|
||||
|
||||
async def test_endofsentence(self):
|
||||
assert match_endofsentence("This is a sentence.")
|
||||
assert match_endofsentence("This is a sentence! ")
|
||||
assert match_endofsentence("This is a sentence?")
|
||||
assert match_endofsentence("This is a sentence:")
|
||||
assert not match_endofsentence("This is not a sentence")
|
||||
assert not match_endofsentence("This is not a sentence,")
|
||||
assert not match_endofsentence("This is not a sentence, ")
|
||||
assert not match_endofsentence("Ok, Mr. Smith let's ")
|
||||
assert not match_endofsentence("Dr. Walker, I presume ")
|
||||
assert not match_endofsentence("Prof. Walker, I presume ")
|
||||
assert not match_endofsentence("zweitens, und 3.")
|
||||
assert not match_endofsentence("Heute ist Dienstag, der 3.") # 3. Juli 2024
|
||||
assert not match_endofsentence("America, or the U.") # U.S.A.
|
||||
assert not match_endofsentence("It still early, it's 3:00 a.") # 3:00 a.m.
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
Reference in New Issue
Block a user