Compare commits
13 Commits
v0.0.46
...
khk/load-j
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b20687e32a | ||
|
|
388b3a239b | ||
|
|
b6b1ef0a40 | ||
|
|
e62f762382 | ||
|
|
dbfda14342 | ||
|
|
fee85418cd | ||
|
|
015faa3dbd | ||
|
|
1dbf4ff27d | ||
|
|
4f1b2dce9b | ||
|
|
5640bd9447 | ||
|
|
1fa52b62aa | ||
|
|
ec98a13a08 | ||
|
|
b64dbe7bb4 |
15
CHANGELOG.md
15
CHANGELOG.md
@@ -5,6 +5,21 @@ All notable changes to **Pipecat** will be documented in this file.
|
||||
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
||||
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||
|
||||
## [Unreleased]
|
||||
|
||||
### Added
|
||||
|
||||
- Added `AssemblyAISTTService` and corresponding foundational examples
|
||||
`07o-interruptible-assemblyai.py` and `13d-assemblyai-transcription.py`.
|
||||
|
||||
- Added a foundational example for Gladia transcription:
|
||||
`13c-gladia-transcription.py`
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fixed `enable_usage_metrics` to control LLM/TTS usage metrics separately
|
||||
from `enable_metrics`.
|
||||
|
||||
## [0.0.46] - 2024-10-19
|
||||
|
||||
### Added
|
||||
|
||||
@@ -38,7 +38,7 @@ pip install "pipecat-ai[option,...]"
|
||||
|
||||
Your project may or may not need these, so they're made available as optional requirements. Here is a list:
|
||||
|
||||
- **AI services**: `anthropic`, `aws`, `azure`, `deepgram`, `gladia`, `google`, `fal`, `lmnt`, `moondream`, `openai`, `openpipe`, `playht`, `silero`, `whisper`, `xtts`
|
||||
- **AI services**: `anthropic`, `assemblyai`, `aws`, `azure`, `deepgram`, `gladia`, `google`, `fal`, `lmnt`, `moondream`, `openai`, `openpipe`, `playht`, `silero`, `whisper`, `xtts`
|
||||
- **Transports**: `local`, `websocket`, `daily`
|
||||
|
||||
## Code examples
|
||||
|
||||
@@ -1,16 +1,10 @@
|
||||
FROM python:3.10-bullseye
|
||||
|
||||
RUN mkdir /app
|
||||
RUN mkdir /app/assets
|
||||
RUN mkdir /app/utils
|
||||
COPY *.py /app/
|
||||
COPY requirements.txt /app/
|
||||
copy assets/* /app/assets/
|
||||
copy utils/* /app/utils/
|
||||
|
||||
WORKDIR /app
|
||||
RUN pip3 install -r requirements.txt
|
||||
|
||||
EXPOSE 7860
|
||||
|
||||
CMD ["python3", "server.py"]
|
||||
CMD ["python3", "server.py"]
|
||||
|
||||
@@ -27,7 +27,7 @@ cp env.example .env # and add your credentials
|
||||
python server.py
|
||||
```
|
||||
|
||||
Then, visit `http://localhost:7860/start` in your browser to start a chatbot session.
|
||||
Then, visit `http://localhost:7860/` in your browser to start a chatbot session.
|
||||
|
||||
## Build and test the Docker image
|
||||
|
||||
|
||||
@@ -2,4 +2,5 @@ DAILY_SAMPLE_ROOM_URL=https://yourdomain.daily.co/yourroom # (for joining the bo
|
||||
DAILY_API_KEY=7df...
|
||||
OPENAI_API_KEY=sk-PL...
|
||||
ELEVENLABS_API_KEY=aeb...
|
||||
CANONICAL_API_KEY=can...
|
||||
CANONICAL_API_KEY=can...
|
||||
CANONICAL_API_URL=
|
||||
|
||||
@@ -59,7 +59,7 @@ app.add_middleware(
|
||||
)
|
||||
|
||||
|
||||
@app.get("/start")
|
||||
@app.get("/")
|
||||
async def start_agent(request: Request):
|
||||
print(f"!!! Creating room")
|
||||
room = await daily_helpers["rest"].create_room(DailyRoomParams())
|
||||
|
||||
@@ -27,7 +27,7 @@ cp env.example .env # and add your credentials
|
||||
python server.py
|
||||
```
|
||||
|
||||
Then, visit `http://localhost:7860/start` in your browser to start a chatbot session.
|
||||
Then, visit `http://localhost:7860/` in your browser to start a chatbot session.
|
||||
|
||||
## Build and test the Docker image
|
||||
|
||||
|
||||
@@ -59,7 +59,7 @@ app.add_middleware(
|
||||
)
|
||||
|
||||
|
||||
@app.get("/start")
|
||||
@app.get("/")
|
||||
async def start_agent(request: Request):
|
||||
print(f"!!! Creating room")
|
||||
room = await daily_helpers["rest"].create_room(DailyRoomParams())
|
||||
|
||||
@@ -34,6 +34,6 @@ Note: you can do this manually via the fly.io dashboard under the "secrets" sub-
|
||||
|
||||
Send a post request to your running fly.io instance:
|
||||
|
||||
`curl --location --request POST 'https://YOUR_FLY_APP_NAME/start_bot'`
|
||||
`curl --location --request POST 'https://YOUR_FLY_APP_NAME/'`
|
||||
|
||||
This request will wait until the machine enters into a `starting` state, before returning the a room URL and token to join.
|
||||
|
||||
@@ -124,7 +124,7 @@ async def spawn_fly_machine(room_url: str, token: str):
|
||||
print(f"Machine joined room: {room_url}")
|
||||
|
||||
|
||||
@app.post("/start_bot")
|
||||
@app.post("/")
|
||||
async def start_bot(request: Request) -> JSONResponse:
|
||||
try:
|
||||
data = await request.json()
|
||||
|
||||
97
examples/foundational/07o-interruptible-assemblyai.py
Normal file
97
examples/foundational/07o-interruptible-assemblyai.py
Normal file
@@ -0,0 +1,97 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import sys
|
||||
|
||||
import aiohttp
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
from runner import configure
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import LLMMessagesFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.services.assemblyai import AssemblyAISTTService
|
||||
from pipecat.services.cartesia import CartesiaTTSService
|
||||
from pipecat.services.openai import OpenAILLMService
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
logger.remove(0)
|
||||
logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
|
||||
async def main():
|
||||
async with aiohttp.ClientSession() as session:
|
||||
(room_url, token) = await configure(session)
|
||||
|
||||
transport = DailyTransport(
|
||||
room_url,
|
||||
token,
|
||||
"Respond bot",
|
||||
DailyParams(
|
||||
audio_out_enabled=True,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
vad_audio_passthrough=True,
|
||||
),
|
||||
)
|
||||
|
||||
stt = AssemblyAISTTService(
|
||||
api_key=os.getenv("ASSEMBLYAI_API_KEY"),
|
||||
)
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
|
||||
},
|
||||
]
|
||||
|
||||
context = OpenAILLMContext(messages)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
stt, # STT
|
||||
context_aggregator.user(), # User responses
|
||||
llm, # LLM
|
||||
tts, # TTS
|
||||
transport.output(), # Transport bot output
|
||||
context_aggregator.assistant(), # Assistant spoken responses
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
# Kick off the conversation.
|
||||
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
|
||||
runner = PipelineRunner()
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
63
examples/foundational/13c-gladia-transcription.py
Normal file
63
examples/foundational/13c-gladia-transcription.py
Normal file
@@ -0,0 +1,63 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import sys
|
||||
|
||||
import aiohttp
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
from runner import configure
|
||||
|
||||
from pipecat.frames.frames import Frame, TranscriptionFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineTask
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.services.gladia import GladiaSTTService
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
logger.remove(0)
|
||||
logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
|
||||
class TranscriptionLogger(FrameProcessor):
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, TranscriptionFrame):
|
||||
print(f"Transcription: {frame.text}")
|
||||
|
||||
|
||||
async def main():
|
||||
async with aiohttp.ClientSession() as session:
|
||||
(room_url, _) = await configure(session)
|
||||
|
||||
transport = DailyTransport(
|
||||
room_url, None, "Transcription bot", DailyParams(audio_in_enabled=True)
|
||||
)
|
||||
|
||||
stt = GladiaSTTService(
|
||||
api_key=os.getenv("GLADIA_API_KEY"),
|
||||
# live_options=LiveOptions(language=Language.FR),
|
||||
)
|
||||
|
||||
tl = TranscriptionLogger()
|
||||
|
||||
pipeline = Pipeline([transport.input(), stt, tl])
|
||||
|
||||
task = PipelineTask(pipeline)
|
||||
|
||||
runner = PipelineRunner()
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
62
examples/foundational/13d-assemblyai-transcription.py
Normal file
62
examples/foundational/13d-assemblyai-transcription.py
Normal file
@@ -0,0 +1,62 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import sys
|
||||
|
||||
import aiohttp
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
from runner import configure
|
||||
|
||||
from pipecat.frames.frames import Frame, TranscriptionFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineTask
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.services.assemblyai import AssemblyAISTTService
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
logger.remove(0)
|
||||
logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
|
||||
class TranscriptionLogger(FrameProcessor):
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, TranscriptionFrame):
|
||||
print(f"Transcription: {frame.text}")
|
||||
|
||||
|
||||
async def main():
|
||||
async with aiohttp.ClientSession() as session:
|
||||
(room_url, _) = await configure(session)
|
||||
|
||||
transport = DailyTransport(
|
||||
room_url, None, "Transcription bot", DailyParams(audio_in_enabled=True)
|
||||
)
|
||||
|
||||
stt = AssemblyAISTTService(
|
||||
api_key=os.getenv("ASSEMBLYAI_API_KEY"),
|
||||
)
|
||||
|
||||
tl = TranscriptionLogger()
|
||||
|
||||
pipeline = Pipeline([transport.input(), stt, tl])
|
||||
|
||||
task = PipelineTask(pipeline)
|
||||
|
||||
runner = PipelineRunner()
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
@@ -24,7 +24,7 @@ cp env.example .env # and add your credentials
|
||||
python server.py
|
||||
```
|
||||
|
||||
Then, visit `http://localhost:7860/start` in your browser to start a chatbot
|
||||
Then, visit `http://localhost:7860/` in your browser to start a chatbot
|
||||
session.
|
||||
|
||||
## Build and test the Docker image
|
||||
@@ -41,4 +41,4 @@ docker build -t moonbot -f Dockerfile.intel .
|
||||
docker run --env-file .env -p 7860:7860 --device /dev/dri moonbot
|
||||
```
|
||||
|
||||
You can try to visit `http://localhost:7860/start` again.
|
||||
You can try to visit `http://localhost:7860/` again.
|
||||
|
||||
@@ -57,7 +57,7 @@ app.add_middleware(
|
||||
)
|
||||
|
||||
|
||||
@app.get("/start")
|
||||
@app.get("/")
|
||||
async def start_agent(request: Request):
|
||||
print(f"!!! Creating room")
|
||||
room = await daily_helpers["rest"].create_room(DailyRoomParams())
|
||||
|
||||
@@ -54,7 +54,7 @@ cp env.example .env # and add your credentials
|
||||
python server.py
|
||||
```
|
||||
|
||||
Then, visit `http://localhost:7860/start` in your browser to start a chatbot session.
|
||||
Then, visit `http://localhost:7860/` in your browser to start a chatbot session.
|
||||
|
||||
## Build and test the Docker image
|
||||
|
||||
|
||||
@@ -57,7 +57,7 @@ app.add_middleware(
|
||||
)
|
||||
|
||||
|
||||
@app.get("/start")
|
||||
@app.get("/")
|
||||
async def start_agent(request: Request):
|
||||
print(f"!!! Creating room")
|
||||
room = await daily_helpers["rest"].create_room(DailyRoomParams())
|
||||
@@ -128,7 +128,7 @@ if __name__ == "__main__":
|
||||
parser.add_argument("--reload", action="store_true", help="Reload code on change")
|
||||
|
||||
config = parser.parse_args()
|
||||
print(f"to join a test room, visit http://localhost:{config.port}/start")
|
||||
print(f"to join a test room, visit http://localhost:{config.port}/")
|
||||
uvicorn.run(
|
||||
"server:app",
|
||||
host=config.host,
|
||||
|
||||
@@ -27,7 +27,7 @@ cp env.example .env # and add your credentials
|
||||
python server.py
|
||||
```
|
||||
|
||||
Then, visit `http://localhost:7860/start` in your browser to start a chatbot session.
|
||||
Then, visit `http://localhost:7860/` in your browser to start a chatbot session.
|
||||
|
||||
## Build and test the Docker image
|
||||
|
||||
|
||||
@@ -57,7 +57,7 @@ app.add_middleware(
|
||||
)
|
||||
|
||||
|
||||
@app.get("/start")
|
||||
@app.get("/")
|
||||
async def start_agent(request: Request):
|
||||
print(f"!!! Creating room")
|
||||
room = await daily_helpers["rest"].create_room(DailyRoomParams())
|
||||
|
||||
@@ -27,7 +27,7 @@ export default function Call() {
|
||||
|
||||
// Create a new room for the story session
|
||||
try {
|
||||
const response = await fetch("/start_bot", {
|
||||
const response = await fetch("/", {
|
||||
method: "POST",
|
||||
headers: {
|
||||
"Content-Type": "application/json",
|
||||
|
||||
@@ -69,7 +69,7 @@ STATIC_DIR = "frontend/out"
|
||||
app.mount("/static", StaticFiles(directory=STATIC_DIR, html=True), name="static")
|
||||
|
||||
|
||||
@app.post("/start_bot")
|
||||
@app.post("/")
|
||||
async def start_bot(request: Request) -> JSONResponse:
|
||||
if os.getenv("ENV", "dev") == "production":
|
||||
# Only allow requests from the specified domain
|
||||
|
||||
@@ -23,7 +23,7 @@ cp env.example .env # and add your credentials
|
||||
python server.py
|
||||
```
|
||||
|
||||
Then, visit `http://localhost:7860/start` in your browser to start a translatorbot session.
|
||||
Then, visit `http://localhost:7860/` in your browser to start a translatorbot session.
|
||||
|
||||
## Build and test the Docker image
|
||||
|
||||
|
||||
@@ -57,7 +57,7 @@ app.add_middleware(
|
||||
)
|
||||
|
||||
|
||||
@app.get("/start")
|
||||
@app.get("/")
|
||||
async def start_agent(request: Request):
|
||||
print(f"!!! Creating room")
|
||||
room = await daily_helpers["rest"].create_room(DailyRoomParams())
|
||||
|
||||
@@ -53,7 +53,7 @@ This project is a FastAPI-based chatbot that integrates with Twilio to handle We
|
||||
```
|
||||
|
||||
2. **Update the Twilio Webhook**:
|
||||
Copy the ngrok URL and update your Twilio phone number webhook URL to `http://<ngrok_url>/start_call`.
|
||||
Copy the ngrok URL and update your Twilio phone number webhook URL to `http://<ngrok_url>/`.
|
||||
|
||||
3. **Update streams.xml**:
|
||||
Copy the ngrok URL and update templates/streams.xml with `wss://<ngrok_url>/ws`.
|
||||
|
||||
@@ -19,7 +19,7 @@ app.add_middleware(
|
||||
)
|
||||
|
||||
|
||||
@app.post("/start_call")
|
||||
@app.post("/")
|
||||
async def start_call():
|
||||
print("POST TwiML")
|
||||
return HTMLResponse(content=open("templates/streams.xml").read(), media_type="application/xml")
|
||||
|
||||
@@ -37,6 +37,7 @@ Website = "https://pipecat.ai"
|
||||
|
||||
[project.optional-dependencies]
|
||||
anthropic = [ "anthropic~=0.34.0" ]
|
||||
assemblyai = [ "assemblyai~=0.34.0" ]
|
||||
aws = [ "boto3~=1.35.27" ]
|
||||
azure = [ "azure-cognitiveservices-speech~=1.40.0" ]
|
||||
canonical = [ "aiofiles~=24.1.0" ]
|
||||
|
||||
@@ -156,7 +156,7 @@ class PipelineTask:
|
||||
start_frame = StartFrame(
|
||||
allow_interruptions=self._params.allow_interruptions,
|
||||
enable_metrics=self._params.enable_metrics,
|
||||
enable_usage_metrics=self._params.enable_metrics,
|
||||
enable_usage_metrics=self._params.enable_usage_metrics,
|
||||
report_only_initial_ttfb=self._params.report_only_initial_ttfb,
|
||||
clock=self._clock,
|
||||
)
|
||||
|
||||
154
src/pipecat/services/assemblyai.py
Normal file
154
src/pipecat/services/assemblyai.py
Normal file
@@ -0,0 +1,154 @@
|
||||
import asyncio
|
||||
from typing import AsyncGenerator
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
CancelFrame,
|
||||
EndFrame,
|
||||
ErrorFrame,
|
||||
Frame,
|
||||
InterimTranscriptionFrame,
|
||||
StartFrame,
|
||||
TranscriptionFrame,
|
||||
)
|
||||
from pipecat.services.ai_services import STTService
|
||||
from pipecat.transcriptions.language import Language
|
||||
from pipecat.utils.time import time_now_iso8601
|
||||
|
||||
try:
|
||||
import assemblyai as aai
|
||||
from assemblyai import AudioEncoding
|
||||
except ModuleNotFoundError as e:
|
||||
logger.error(f"Exception: {e}")
|
||||
logger.error(
|
||||
"In order to use AssemblyAI, you need to `pip install pipecat-ai[assemblyai]`. Also, set `ASSEMBLYAI_API_KEY` environment variable."
|
||||
)
|
||||
raise Exception(f"Missing module: {e}")
|
||||
|
||||
|
||||
class AssemblyAISTTService(STTService):
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
api_key: str,
|
||||
sample_rate: int = 16000,
|
||||
encoding: AudioEncoding = AudioEncoding("pcm_s16le"),
|
||||
language=Language.EN, # Only English is supported for Realtime
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(**kwargs)
|
||||
|
||||
aai.settings.api_key = api_key
|
||||
self._transcriber: aai.RealtimeTranscriber | None = None
|
||||
# Store reference to the main event loop for use in callback functions
|
||||
self._loop = asyncio.get_event_loop()
|
||||
|
||||
self._settings = {
|
||||
"sample_rate": sample_rate,
|
||||
"encoding": encoding,
|
||||
"language": language,
|
||||
}
|
||||
|
||||
async def set_language(self, language: Language):
|
||||
logger.info(f"Switching STT language to: [{language}]")
|
||||
self._settings["language"] = language
|
||||
await self._disconnect()
|
||||
await self._connect()
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
await self._connect()
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
await super().stop(frame)
|
||||
await self._disconnect()
|
||||
|
||||
async def cancel(self, frame: CancelFrame):
|
||||
await super().cancel(frame)
|
||||
await self._disconnect()
|
||||
|
||||
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
|
||||
"""
|
||||
Process an audio chunk for STT transcription.
|
||||
|
||||
This method streams the audio data to AssemblyAI for real-time transcription.
|
||||
Transcription results are handled asynchronously via callback functions.
|
||||
|
||||
:param audio: Audio data as bytes
|
||||
:yield: None (transcription frames are pushed via self.push_frame in callbacks)
|
||||
"""
|
||||
if self._transcriber:
|
||||
await self.start_processing_metrics()
|
||||
self._transcriber.stream(audio)
|
||||
await self.stop_processing_metrics()
|
||||
yield None
|
||||
|
||||
async def _connect(self):
|
||||
"""
|
||||
Establish a connection to the AssemblyAI real-time transcription service.
|
||||
|
||||
This method sets up the necessary callback functions and initializes the
|
||||
AssemblyAI transcriber.
|
||||
"""
|
||||
|
||||
def on_open(session_opened: aai.RealtimeSessionOpened):
|
||||
"""Callback for when the connection to AssemblyAI is opened."""
|
||||
logger.info(f"{self}: Connected to AssemblyAI")
|
||||
|
||||
def on_data(transcript: aai.RealtimeTranscript):
|
||||
"""
|
||||
Callback for handling incoming transcription data.
|
||||
|
||||
This function runs in a separate thread from the main asyncio event loop.
|
||||
It creates appropriate transcription frames and schedules them to be
|
||||
pushed to the next stage of the pipeline in the main event loop.
|
||||
"""
|
||||
if not transcript.text:
|
||||
return
|
||||
|
||||
timestamp = time_now_iso8601()
|
||||
|
||||
if isinstance(transcript, aai.RealtimeFinalTranscript):
|
||||
frame = TranscriptionFrame(
|
||||
transcript.text, "", timestamp, self._settings["language"]
|
||||
)
|
||||
else:
|
||||
frame = InterimTranscriptionFrame(
|
||||
transcript.text, "", timestamp, self._settings["language"]
|
||||
)
|
||||
|
||||
# Schedule the coroutine to run in the main event loop
|
||||
# This is necessary because this callback runs in a different thread
|
||||
asyncio.run_coroutine_threadsafe(self.push_frame(frame), self._loop)
|
||||
|
||||
def on_error(error: aai.RealtimeError):
|
||||
"""
|
||||
Callback for handling errors from AssemblyAI.
|
||||
|
||||
Like on_data, this runs in a separate thread and schedules error
|
||||
handling in the main event loop.
|
||||
"""
|
||||
logger.error(f"{self}: An error occurred: {error}")
|
||||
# Schedule the coroutine to run in the main event loop
|
||||
asyncio.run_coroutine_threadsafe(self.push_frame(ErrorFrame(str(error))), self._loop)
|
||||
|
||||
def on_close():
|
||||
"""Callback for when the connection to AssemblyAI is closed."""
|
||||
logger.info(f"{self}: Disconnected from AssemblyAI")
|
||||
|
||||
self._transcriber = aai.RealtimeTranscriber(
|
||||
sample_rate=self._settings["sample_rate"],
|
||||
encoding=self._settings["encoding"],
|
||||
on_data=on_data,
|
||||
on_error=on_error,
|
||||
on_open=on_open,
|
||||
on_close=on_close,
|
||||
)
|
||||
self._transcriber.connect()
|
||||
|
||||
async def _disconnect(self):
|
||||
"""Disconnect from the AssemblyAI service and clean up resources."""
|
||||
if self._transcriber:
|
||||
self._transcriber.close()
|
||||
self._transcriber = None
|
||||
1
src/pipecat/workflow/.gitignore
vendored
Normal file
1
src/pipecat/workflow/.gitignore
vendored
Normal file
@@ -0,0 +1 @@
|
||||
*.json
|
||||
1
src/pipecat/workflow/README.md
Normal file
1
src/pipecat/workflow/README.md
Normal file
@@ -0,0 +1 @@
|
||||
python -m pipecat.workflow.workflow_test to run
|
||||
0
src/pipecat/workflow/__init__.py
Normal file
0
src/pipecat/workflow/__init__.py
Normal file
18
src/pipecat/workflow/workflow_mapping.py
Normal file
18
src/pipecat/workflow/workflow_mapping.py
Normal file
@@ -0,0 +1,18 @@
|
||||
from ..services.cartesia import CartesiaTTSService
|
||||
from ..services.openai import OpenAILLMService
|
||||
from ..services.deepgram import DeepgramSTTService
|
||||
from ..transports.services.daily import DailyTransport
|
||||
from ..processors.frame_processor import FrameProcessor
|
||||
|
||||
# Map workflow types to their corresponding Python classes
|
||||
WORKFLOW_MAPPING = {
|
||||
"inputs/audio_input": DailyTransport,
|
||||
"processors/speech_to_text": DeepgramSTTService,
|
||||
"processors/llm": OpenAILLMService,
|
||||
"processors/text_to_speech": CartesiaTTSService,
|
||||
"outputs/audio_output": DailyTransport,
|
||||
}
|
||||
|
||||
|
||||
def get_processor_class(node_type: str) -> type[FrameProcessor]:
|
||||
return WORKFLOW_MAPPING.get(node_type, FrameProcessor)
|
||||
65
src/pipecat/workflow/workflow_test.py
Normal file
65
src/pipecat/workflow/workflow_test.py
Normal file
@@ -0,0 +1,65 @@
|
||||
import asyncio
|
||||
import os
|
||||
from dotenv import load_dotenv
|
||||
from ..pipeline.pipeline import Pipeline
|
||||
from ..pipeline.runner import PipelineRunner
|
||||
from ..pipeline.task import PipelineTask, PipelineParams
|
||||
from .workflow_translator import translate_workflow
|
||||
from ..services.openai import OpenAIUserContextAggregator
|
||||
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
|
||||
async def main():
|
||||
print("Starting workflow test")
|
||||
|
||||
# Update the path to the workflow.json file
|
||||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
workflow_path = os.path.join(script_dir, "workflow.json")
|
||||
print(f"Workflow path: {workflow_path}")
|
||||
|
||||
# Translate the workflow to a list of processors
|
||||
print("Translating workflow to processors")
|
||||
processors, daily_transport = translate_workflow(workflow_path)
|
||||
print(f"Processors created: {processors}")
|
||||
|
||||
# Create a pipeline from the processors
|
||||
print("Creating pipeline")
|
||||
pipeline = Pipeline(processors)
|
||||
print(f"Pipeline created: {pipeline}")
|
||||
|
||||
# Create a pipeline task
|
||||
print("Creating pipeline task")
|
||||
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
|
||||
print(f"Pipeline task created: {task}")
|
||||
|
||||
# Create a pipeline runner
|
||||
print("Creating pipeline runner")
|
||||
runner = PipelineRunner()
|
||||
print(f"Pipeline runner created: {runner}")
|
||||
|
||||
user_context_aggregator = next(
|
||||
p for p in processors if isinstance(p, OpenAIUserContextAggregator)
|
||||
)
|
||||
|
||||
@daily_transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
transport.capture_participant_transcription(participant["id"])
|
||||
await task.queue_frames([user_context_aggregator.get_context_frame()])
|
||||
|
||||
# Run the pipeline
|
||||
print("Running the pipeline")
|
||||
try:
|
||||
await runner.run(task)
|
||||
print("Pipeline execution completed successfully")
|
||||
except Exception as e:
|
||||
print(f"Error during pipeline execution: {e}")
|
||||
|
||||
print("Workflow test completed")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("Starting main execution")
|
||||
asyncio.run(main())
|
||||
print("Main execution completed")
|
||||
140
src/pipecat/workflow/workflow_translator.py
Normal file
140
src/pipecat/workflow/workflow_translator.py
Normal file
@@ -0,0 +1,140 @@
|
||||
import json
|
||||
import os
|
||||
|
||||
from typing import Any, Dict, List, Tuple
|
||||
from .workflow_mapping import get_processor_class
|
||||
from ..processors.frame_processor import FrameProcessor
|
||||
from ..transports.services.daily import DailyParams
|
||||
from ..processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from ..audio.vad.silero import SileroVADAnalyzer
|
||||
from ..transports.base_transport import BaseTransport
|
||||
|
||||
|
||||
def load_workflow(file_path: str) -> Dict[str, Any]:
|
||||
print(f"Loading workflow from file: {file_path}")
|
||||
try:
|
||||
with open(file_path, "r") as f:
|
||||
workflow = json.load(f)
|
||||
print(f"Workflow loaded successfully: {workflow}")
|
||||
return workflow
|
||||
except Exception as e:
|
||||
print(f"Error loading workflow: {e}")
|
||||
raise
|
||||
|
||||
|
||||
def create_processor(node: Dict[str, Any], next_node: Dict[str, Any] = None) -> FrameProcessor:
|
||||
print(f"Creating processor for node: {node['id']} of type: {node['type']}")
|
||||
processor_class = get_processor_class(node["type"])
|
||||
print(f"Processor class: {processor_class}")
|
||||
|
||||
# Extract relevant properties for initialization
|
||||
init_params = {}
|
||||
if node["type"] == "inputs/audio_input":
|
||||
init_params = {
|
||||
"room_url": os.getenv("DAILY_SAMPLE_ROOM_URL"),
|
||||
"token": "",
|
||||
"bot_name": "PipecatBot",
|
||||
"params": DailyParams(
|
||||
audio_out_enabled=True,
|
||||
vad_enabled=True,
|
||||
vad_audio_passthrough=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
),
|
||||
}
|
||||
elif node["type"] == "processors/speech_to_text":
|
||||
init_params = {
|
||||
"api_key": os.getenv("DEEPGRAM_API_KEY"),
|
||||
}
|
||||
elif node["type"] == "processors/text_to_speech":
|
||||
init_params = {
|
||||
"api_key": os.getenv("CARTESIA_API_KEY"),
|
||||
"voice_id": "79a125e8-cd45-4c13-8a67-188112f4dd22",
|
||||
}
|
||||
|
||||
print(f"Initialization parameters: {init_params}")
|
||||
processor = processor_class(**init_params)
|
||||
print(f"Processor created: {processor}")
|
||||
|
||||
return processor
|
||||
|
||||
|
||||
def create_pipeline(workflow: Dict[str, Any]) -> Tuple[List[FrameProcessor], BaseTransport]:
|
||||
print("Creating pipeline from workflow")
|
||||
nodes = {node["id"]: node for node in workflow["nodes"]}
|
||||
links = workflow["links"]
|
||||
|
||||
print(f"Nodes: {nodes}")
|
||||
print(f"Links: {links}")
|
||||
|
||||
# Create a dictionary to store processors
|
||||
processors = {}
|
||||
daily_transport = None
|
||||
llm_service = None
|
||||
context_aggregator = None
|
||||
|
||||
# Create processors for each node
|
||||
for node_id, node in nodes.items():
|
||||
print(f"Creating processor for node: {node_id}")
|
||||
|
||||
if node["type"] == "inputs/audio_input":
|
||||
daily_transport = create_processor(node)
|
||||
processors[node_id] = {"processor": daily_transport, "type": node["type"]}
|
||||
elif node["type"] == "outputs/audio_output":
|
||||
if daily_transport is None:
|
||||
raise ValueError("Audio output transport node found before audio input node")
|
||||
processors[node_id] = {"processor": daily_transport, "type": node["type"]}
|
||||
elif node["type"] == "processors/llm":
|
||||
llm_service = create_processor(node)
|
||||
processors[node_id] = {"processor": llm_service, "type": node["type"]}
|
||||
context = OpenAILLMContext(
|
||||
[
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful assistant. Your name is Housecat. You are participating in a voice conversation. Keep your answers brief. For punctuation use only period, comma, and question mark.",
|
||||
},
|
||||
{"role": "user", "content": "Introduce yourself."},
|
||||
]
|
||||
)
|
||||
context_aggregator = llm_service.create_context_aggregator(context)
|
||||
print(f"Context aggregator created: {context_aggregator}")
|
||||
else:
|
||||
processors[node_id] = {"processor": create_processor(node), "type": node["type"]}
|
||||
|
||||
# Create the pipeline based on the links
|
||||
pipeline = []
|
||||
for link in links:
|
||||
source_id, _, _, target_id, _, _ = link
|
||||
print(f"Processing link: {source_id} -> {target_id}")
|
||||
|
||||
if processors[source_id]["processor"] not in pipeline:
|
||||
print(f"Adding source processor: {source_id}, {processors[source_id]['processor']}")
|
||||
if processors[source_id]["type"] == "inputs/audio_input":
|
||||
pipeline.append(processors[source_id]["processor"].input())
|
||||
else:
|
||||
pipeline.append(processors[source_id]["processor"])
|
||||
|
||||
if processors[target_id]["processor"] not in pipeline and target_id in processors:
|
||||
print(f"Adding target processor: {target_id} {processors[target_id]['processor']}")
|
||||
if processors[target_id]["type"] == "outputs/audio_output":
|
||||
pipeline.append(processors[target_id]["processor"].output())
|
||||
elif processors[target_id]["type"] == "processors/llm":
|
||||
print("TRYING TO LINK AGGREGATOR")
|
||||
if context_aggregator:
|
||||
print("AGGREGATOR FOUND")
|
||||
pipeline.append(context_aggregator.user())
|
||||
pipeline.append(processors[target_id]["processor"])
|
||||
else:
|
||||
pipeline.append(processors[target_id]["processor"])
|
||||
|
||||
print(f"Pipeline created with {len(pipeline)} processors")
|
||||
print(f"Pipeline: {pipeline}")
|
||||
|
||||
return pipeline, daily_transport
|
||||
|
||||
|
||||
def translate_workflow(file_path: str) -> Tuple[List[FrameProcessor], BaseTransport]:
|
||||
print(f"Translating workflow from file: {file_path}")
|
||||
workflow = load_workflow(file_path)
|
||||
pipeline, transport = create_pipeline(workflow)
|
||||
print("Workflow translation completed")
|
||||
return pipeline, transport
|
||||
Reference in New Issue
Block a user