Compare commits

...

13 Commits

Author SHA1 Message Date
Kwindla Hultman Kramer
b20687e32a workflow_test working except for text_input node 2024-11-01 21:56:30 -07:00
hyypeman
388b3a239b hackathon demo 2024-10-21 22:35:20 -07:00
Aleix Conchillo Flaqué
b6b1ef0a40 Merge pull request #589 from Allenmylath/patch-12
Update Dockerfile
2024-10-20 10:59:43 -07:00
Mark Backman
e62f762382 Merge pull request #625 from pipecat-ai/mb/add-assemblyai-stt
Add support for AssemblyAI STT
2024-10-20 13:59:33 -04:00
Aleix Conchillo Flaqué
dbfda14342 Merge pull request #587 from Allenmylath/patch-9
Update env.example
2024-10-20 10:58:50 -07:00
Aleix Conchillo Flaqué
fee85418cd Merge pull request #620 from gregschwartz/main
Start agent/call/bot at localhost root
2024-10-20 10:14:10 -07:00
Mark Backman
015faa3dbd Update CHANGELOG and README 2024-10-20 08:57:57 -04:00
Mark Backman
1dbf4ff27d Add AssemblyAI STT service 2024-10-20 08:57:57 -04:00
Aleix Conchillo Flaqué
4f1b2dce9b Merge pull request #624 from pvilchez/fix_enable_usage_metrics
Fixing `enable_usage_metrics` setting.
2024-10-20 01:00:12 -07:00
Paul Vilchez
5640bd9447 Fixing a config mismatch which caused usage stats to only report when enable_metrics was true. 2024-10-20 03:33:13 -04:00
Greg Schwartz
1fa52b62aa Put start agent/call at localhost root. Before you had to read in the docs to go to /start, or /start_call or /start_bot. Which isn't mentioned in the console output, and is inconsistent, adding friction to learning the codebase 2024-10-19 16:18:43 -07:00
allenmylath
ec98a13a08 Update Dockerfile
utils and assets not used in this example hence removed
2024-10-15 08:18:16 +05:30
allenmylath
b64dbe7bb4 Update env.example
canonical api url is also used from env.
2024-10-15 08:10:07 +05:30
34 changed files with 642 additions and 30 deletions

View File

@@ -5,6 +5,21 @@ All notable changes to **Pipecat** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
### Added
- Added `AssemblyAISTTService` and corresponding foundational examples
`07o-interruptible-assemblyai.py` and `13d-assemblyai-transcription.py`.
- Added a foundational example for Gladia transcription:
`13c-gladia-transcription.py`
### Fixed
- Fixed `enable_usage_metrics` to control LLM/TTS usage metrics separately
from `enable_metrics`.
## [0.0.46] - 2024-10-19
### Added

View File

@@ -38,7 +38,7 @@ pip install "pipecat-ai[option,...]"
Your project may or may not need these, so they're made available as optional requirements. Here is a list:
- **AI services**: `anthropic`, `aws`, `azure`, `deepgram`, `gladia`, `google`, `fal`, `lmnt`, `moondream`, `openai`, `openpipe`, `playht`, `silero`, `whisper`, `xtts`
- **AI services**: `anthropic`, `assemblyai`, `aws`, `azure`, `deepgram`, `gladia`, `google`, `fal`, `lmnt`, `moondream`, `openai`, `openpipe`, `playht`, `silero`, `whisper`, `xtts`
- **Transports**: `local`, `websocket`, `daily`
## Code examples

View File

@@ -1,16 +1,10 @@
FROM python:3.10-bullseye
RUN mkdir /app
RUN mkdir /app/assets
RUN mkdir /app/utils
COPY *.py /app/
COPY requirements.txt /app/
copy assets/* /app/assets/
copy utils/* /app/utils/
WORKDIR /app
RUN pip3 install -r requirements.txt
EXPOSE 7860
CMD ["python3", "server.py"]
CMD ["python3", "server.py"]

View File

@@ -27,7 +27,7 @@ cp env.example .env # and add your credentials
python server.py
```
Then, visit `http://localhost:7860/start` in your browser to start a chatbot session.
Then, visit `http://localhost:7860/` in your browser to start a chatbot session.
## Build and test the Docker image

View File

@@ -2,4 +2,5 @@ DAILY_SAMPLE_ROOM_URL=https://yourdomain.daily.co/yourroom # (for joining the bo
DAILY_API_KEY=7df...
OPENAI_API_KEY=sk-PL...
ELEVENLABS_API_KEY=aeb...
CANONICAL_API_KEY=can...
CANONICAL_API_KEY=can...
CANONICAL_API_URL=

View File

@@ -59,7 +59,7 @@ app.add_middleware(
)
@app.get("/start")
@app.get("/")
async def start_agent(request: Request):
print(f"!!! Creating room")
room = await daily_helpers["rest"].create_room(DailyRoomParams())

View File

@@ -27,7 +27,7 @@ cp env.example .env # and add your credentials
python server.py
```
Then, visit `http://localhost:7860/start` in your browser to start a chatbot session.
Then, visit `http://localhost:7860/` in your browser to start a chatbot session.
## Build and test the Docker image

View File

@@ -59,7 +59,7 @@ app.add_middleware(
)
@app.get("/start")
@app.get("/")
async def start_agent(request: Request):
print(f"!!! Creating room")
room = await daily_helpers["rest"].create_room(DailyRoomParams())

View File

@@ -34,6 +34,6 @@ Note: you can do this manually via the fly.io dashboard under the "secrets" sub-
Send a post request to your running fly.io instance:
`curl --location --request POST 'https://YOUR_FLY_APP_NAME/start_bot'`
`curl --location --request POST 'https://YOUR_FLY_APP_NAME/'`
This request will wait until the machine enters into a `starting` state, before returning the a room URL and token to join.

View File

@@ -124,7 +124,7 @@ async def spawn_fly_machine(room_url: str, token: str):
print(f"Machine joined room: {room_url}")
@app.post("/start_bot")
@app.post("/")
async def start_bot(request: Request) -> JSONResponse:
try:
data = await request.json()

View File

@@ -0,0 +1,97 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.assemblyai import AssemblyAISTTService
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
),
)
stt = AssemblyAISTTService(
api_key=os.getenv("ASSEMBLYAI_API_KEY"),
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,63 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.frames.frames import Frame, TranscriptionFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.gladia import GladiaSTTService
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
class TranscriptionLogger(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TranscriptionFrame):
print(f"Transcription: {frame.text}")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, _) = await configure(session)
transport = DailyTransport(
room_url, None, "Transcription bot", DailyParams(audio_in_enabled=True)
)
stt = GladiaSTTService(
api_key=os.getenv("GLADIA_API_KEY"),
# live_options=LiveOptions(language=Language.FR),
)
tl = TranscriptionLogger()
pipeline = Pipeline([transport.input(), stt, tl])
task = PipelineTask(pipeline)
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,62 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.frames.frames import Frame, TranscriptionFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.assemblyai import AssemblyAISTTService
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
class TranscriptionLogger(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TranscriptionFrame):
print(f"Transcription: {frame.text}")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, _) = await configure(session)
transport = DailyTransport(
room_url, None, "Transcription bot", DailyParams(audio_in_enabled=True)
)
stt = AssemblyAISTTService(
api_key=os.getenv("ASSEMBLYAI_API_KEY"),
)
tl = TranscriptionLogger()
pipeline = Pipeline([transport.input(), stt, tl])
task = PipelineTask(pipeline)
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -24,7 +24,7 @@ cp env.example .env # and add your credentials
python server.py
```
Then, visit `http://localhost:7860/start` in your browser to start a chatbot
Then, visit `http://localhost:7860/` in your browser to start a chatbot
session.
## Build and test the Docker image
@@ -41,4 +41,4 @@ docker build -t moonbot -f Dockerfile.intel .
docker run --env-file .env -p 7860:7860 --device /dev/dri moonbot
```
You can try to visit `http://localhost:7860/start` again.
You can try to visit `http://localhost:7860/` again.

View File

@@ -57,7 +57,7 @@ app.add_middleware(
)
@app.get("/start")
@app.get("/")
async def start_agent(request: Request):
print(f"!!! Creating room")
room = await daily_helpers["rest"].create_room(DailyRoomParams())

View File

@@ -54,7 +54,7 @@ cp env.example .env # and add your credentials
python server.py
```
Then, visit `http://localhost:7860/start` in your browser to start a chatbot session.
Then, visit `http://localhost:7860/` in your browser to start a chatbot session.
## Build and test the Docker image

View File

@@ -57,7 +57,7 @@ app.add_middleware(
)
@app.get("/start")
@app.get("/")
async def start_agent(request: Request):
print(f"!!! Creating room")
room = await daily_helpers["rest"].create_room(DailyRoomParams())
@@ -128,7 +128,7 @@ if __name__ == "__main__":
parser.add_argument("--reload", action="store_true", help="Reload code on change")
config = parser.parse_args()
print(f"to join a test room, visit http://localhost:{config.port}/start")
print(f"to join a test room, visit http://localhost:{config.port}/")
uvicorn.run(
"server:app",
host=config.host,

View File

@@ -27,7 +27,7 @@ cp env.example .env # and add your credentials
python server.py
```
Then, visit `http://localhost:7860/start` in your browser to start a chatbot session.
Then, visit `http://localhost:7860/` in your browser to start a chatbot session.
## Build and test the Docker image

View File

@@ -57,7 +57,7 @@ app.add_middleware(
)
@app.get("/start")
@app.get("/")
async def start_agent(request: Request):
print(f"!!! Creating room")
room = await daily_helpers["rest"].create_room(DailyRoomParams())

View File

@@ -27,7 +27,7 @@ export default function Call() {
// Create a new room for the story session
try {
const response = await fetch("/start_bot", {
const response = await fetch("/", {
method: "POST",
headers: {
"Content-Type": "application/json",

View File

@@ -69,7 +69,7 @@ STATIC_DIR = "frontend/out"
app.mount("/static", StaticFiles(directory=STATIC_DIR, html=True), name="static")
@app.post("/start_bot")
@app.post("/")
async def start_bot(request: Request) -> JSONResponse:
if os.getenv("ENV", "dev") == "production":
# Only allow requests from the specified domain

View File

@@ -23,7 +23,7 @@ cp env.example .env # and add your credentials
python server.py
```
Then, visit `http://localhost:7860/start` in your browser to start a translatorbot session.
Then, visit `http://localhost:7860/` in your browser to start a translatorbot session.
## Build and test the Docker image

View File

@@ -57,7 +57,7 @@ app.add_middleware(
)
@app.get("/start")
@app.get("/")
async def start_agent(request: Request):
print(f"!!! Creating room")
room = await daily_helpers["rest"].create_room(DailyRoomParams())

View File

@@ -53,7 +53,7 @@ This project is a FastAPI-based chatbot that integrates with Twilio to handle We
```
2. **Update the Twilio Webhook**:
Copy the ngrok URL and update your Twilio phone number webhook URL to `http://<ngrok_url>/start_call`.
Copy the ngrok URL and update your Twilio phone number webhook URL to `http://<ngrok_url>/`.
3. **Update streams.xml**:
Copy the ngrok URL and update templates/streams.xml with `wss://<ngrok_url>/ws`.

View File

@@ -19,7 +19,7 @@ app.add_middleware(
)
@app.post("/start_call")
@app.post("/")
async def start_call():
print("POST TwiML")
return HTMLResponse(content=open("templates/streams.xml").read(), media_type="application/xml")

View File

@@ -37,6 +37,7 @@ Website = "https://pipecat.ai"
[project.optional-dependencies]
anthropic = [ "anthropic~=0.34.0" ]
assemblyai = [ "assemblyai~=0.34.0" ]
aws = [ "boto3~=1.35.27" ]
azure = [ "azure-cognitiveservices-speech~=1.40.0" ]
canonical = [ "aiofiles~=24.1.0" ]

View File

@@ -156,7 +156,7 @@ class PipelineTask:
start_frame = StartFrame(
allow_interruptions=self._params.allow_interruptions,
enable_metrics=self._params.enable_metrics,
enable_usage_metrics=self._params.enable_metrics,
enable_usage_metrics=self._params.enable_usage_metrics,
report_only_initial_ttfb=self._params.report_only_initial_ttfb,
clock=self._clock,
)

View File

@@ -0,0 +1,154 @@
import asyncio
from typing import AsyncGenerator
from loguru import logger
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
InterimTranscriptionFrame,
StartFrame,
TranscriptionFrame,
)
from pipecat.services.ai_services import STTService
from pipecat.transcriptions.language import Language
from pipecat.utils.time import time_now_iso8601
try:
import assemblyai as aai
from assemblyai import AudioEncoding
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
"In order to use AssemblyAI, you need to `pip install pipecat-ai[assemblyai]`. Also, set `ASSEMBLYAI_API_KEY` environment variable."
)
raise Exception(f"Missing module: {e}")
class AssemblyAISTTService(STTService):
def __init__(
self,
*,
api_key: str,
sample_rate: int = 16000,
encoding: AudioEncoding = AudioEncoding("pcm_s16le"),
language=Language.EN, # Only English is supported for Realtime
**kwargs,
):
super().__init__(**kwargs)
aai.settings.api_key = api_key
self._transcriber: aai.RealtimeTranscriber | None = None
# Store reference to the main event loop for use in callback functions
self._loop = asyncio.get_event_loop()
self._settings = {
"sample_rate": sample_rate,
"encoding": encoding,
"language": language,
}
async def set_language(self, language: Language):
logger.info(f"Switching STT language to: [{language}]")
self._settings["language"] = language
await self._disconnect()
await self._connect()
async def start(self, frame: StartFrame):
await super().start(frame)
await self._connect()
async def stop(self, frame: EndFrame):
await super().stop(frame)
await self._disconnect()
async def cancel(self, frame: CancelFrame):
await super().cancel(frame)
await self._disconnect()
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
"""
Process an audio chunk for STT transcription.
This method streams the audio data to AssemblyAI for real-time transcription.
Transcription results are handled asynchronously via callback functions.
:param audio: Audio data as bytes
:yield: None (transcription frames are pushed via self.push_frame in callbacks)
"""
if self._transcriber:
await self.start_processing_metrics()
self._transcriber.stream(audio)
await self.stop_processing_metrics()
yield None
async def _connect(self):
"""
Establish a connection to the AssemblyAI real-time transcription service.
This method sets up the necessary callback functions and initializes the
AssemblyAI transcriber.
"""
def on_open(session_opened: aai.RealtimeSessionOpened):
"""Callback for when the connection to AssemblyAI is opened."""
logger.info(f"{self}: Connected to AssemblyAI")
def on_data(transcript: aai.RealtimeTranscript):
"""
Callback for handling incoming transcription data.
This function runs in a separate thread from the main asyncio event loop.
It creates appropriate transcription frames and schedules them to be
pushed to the next stage of the pipeline in the main event loop.
"""
if not transcript.text:
return
timestamp = time_now_iso8601()
if isinstance(transcript, aai.RealtimeFinalTranscript):
frame = TranscriptionFrame(
transcript.text, "", timestamp, self._settings["language"]
)
else:
frame = InterimTranscriptionFrame(
transcript.text, "", timestamp, self._settings["language"]
)
# Schedule the coroutine to run in the main event loop
# This is necessary because this callback runs in a different thread
asyncio.run_coroutine_threadsafe(self.push_frame(frame), self._loop)
def on_error(error: aai.RealtimeError):
"""
Callback for handling errors from AssemblyAI.
Like on_data, this runs in a separate thread and schedules error
handling in the main event loop.
"""
logger.error(f"{self}: An error occurred: {error}")
# Schedule the coroutine to run in the main event loop
asyncio.run_coroutine_threadsafe(self.push_frame(ErrorFrame(str(error))), self._loop)
def on_close():
"""Callback for when the connection to AssemblyAI is closed."""
logger.info(f"{self}: Disconnected from AssemblyAI")
self._transcriber = aai.RealtimeTranscriber(
sample_rate=self._settings["sample_rate"],
encoding=self._settings["encoding"],
on_data=on_data,
on_error=on_error,
on_open=on_open,
on_close=on_close,
)
self._transcriber.connect()
async def _disconnect(self):
"""Disconnect from the AssemblyAI service and clean up resources."""
if self._transcriber:
self._transcriber.close()
self._transcriber = None

1
src/pipecat/workflow/.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
*.json

View File

@@ -0,0 +1 @@
python -m pipecat.workflow.workflow_test to run

View File

View File

@@ -0,0 +1,18 @@
from ..services.cartesia import CartesiaTTSService
from ..services.openai import OpenAILLMService
from ..services.deepgram import DeepgramSTTService
from ..transports.services.daily import DailyTransport
from ..processors.frame_processor import FrameProcessor
# Map workflow types to their corresponding Python classes
WORKFLOW_MAPPING = {
"inputs/audio_input": DailyTransport,
"processors/speech_to_text": DeepgramSTTService,
"processors/llm": OpenAILLMService,
"processors/text_to_speech": CartesiaTTSService,
"outputs/audio_output": DailyTransport,
}
def get_processor_class(node_type: str) -> type[FrameProcessor]:
return WORKFLOW_MAPPING.get(node_type, FrameProcessor)

View File

@@ -0,0 +1,65 @@
import asyncio
import os
from dotenv import load_dotenv
from ..pipeline.pipeline import Pipeline
from ..pipeline.runner import PipelineRunner
from ..pipeline.task import PipelineTask, PipelineParams
from .workflow_translator import translate_workflow
from ..services.openai import OpenAIUserContextAggregator
load_dotenv(override=True)
async def main():
print("Starting workflow test")
# Update the path to the workflow.json file
script_dir = os.path.dirname(os.path.abspath(__file__))
workflow_path = os.path.join(script_dir, "workflow.json")
print(f"Workflow path: {workflow_path}")
# Translate the workflow to a list of processors
print("Translating workflow to processors")
processors, daily_transport = translate_workflow(workflow_path)
print(f"Processors created: {processors}")
# Create a pipeline from the processors
print("Creating pipeline")
pipeline = Pipeline(processors)
print(f"Pipeline created: {pipeline}")
# Create a pipeline task
print("Creating pipeline task")
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
print(f"Pipeline task created: {task}")
# Create a pipeline runner
print("Creating pipeline runner")
runner = PipelineRunner()
print(f"Pipeline runner created: {runner}")
user_context_aggregator = next(
p for p in processors if isinstance(p, OpenAIUserContextAggregator)
)
@daily_transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await task.queue_frames([user_context_aggregator.get_context_frame()])
# Run the pipeline
print("Running the pipeline")
try:
await runner.run(task)
print("Pipeline execution completed successfully")
except Exception as e:
print(f"Error during pipeline execution: {e}")
print("Workflow test completed")
if __name__ == "__main__":
print("Starting main execution")
asyncio.run(main())
print("Main execution completed")

View File

@@ -0,0 +1,140 @@
import json
import os
from typing import Any, Dict, List, Tuple
from .workflow_mapping import get_processor_class
from ..processors.frame_processor import FrameProcessor
from ..transports.services.daily import DailyParams
from ..processors.aggregators.openai_llm_context import OpenAILLMContext
from ..audio.vad.silero import SileroVADAnalyzer
from ..transports.base_transport import BaseTransport
def load_workflow(file_path: str) -> Dict[str, Any]:
print(f"Loading workflow from file: {file_path}")
try:
with open(file_path, "r") as f:
workflow = json.load(f)
print(f"Workflow loaded successfully: {workflow}")
return workflow
except Exception as e:
print(f"Error loading workflow: {e}")
raise
def create_processor(node: Dict[str, Any], next_node: Dict[str, Any] = None) -> FrameProcessor:
print(f"Creating processor for node: {node['id']} of type: {node['type']}")
processor_class = get_processor_class(node["type"])
print(f"Processor class: {processor_class}")
# Extract relevant properties for initialization
init_params = {}
if node["type"] == "inputs/audio_input":
init_params = {
"room_url": os.getenv("DAILY_SAMPLE_ROOM_URL"),
"token": "",
"bot_name": "PipecatBot",
"params": DailyParams(
audio_out_enabled=True,
vad_enabled=True,
vad_audio_passthrough=True,
vad_analyzer=SileroVADAnalyzer(),
),
}
elif node["type"] == "processors/speech_to_text":
init_params = {
"api_key": os.getenv("DEEPGRAM_API_KEY"),
}
elif node["type"] == "processors/text_to_speech":
init_params = {
"api_key": os.getenv("CARTESIA_API_KEY"),
"voice_id": "79a125e8-cd45-4c13-8a67-188112f4dd22",
}
print(f"Initialization parameters: {init_params}")
processor = processor_class(**init_params)
print(f"Processor created: {processor}")
return processor
def create_pipeline(workflow: Dict[str, Any]) -> Tuple[List[FrameProcessor], BaseTransport]:
print("Creating pipeline from workflow")
nodes = {node["id"]: node for node in workflow["nodes"]}
links = workflow["links"]
print(f"Nodes: {nodes}")
print(f"Links: {links}")
# Create a dictionary to store processors
processors = {}
daily_transport = None
llm_service = None
context_aggregator = None
# Create processors for each node
for node_id, node in nodes.items():
print(f"Creating processor for node: {node_id}")
if node["type"] == "inputs/audio_input":
daily_transport = create_processor(node)
processors[node_id] = {"processor": daily_transport, "type": node["type"]}
elif node["type"] == "outputs/audio_output":
if daily_transport is None:
raise ValueError("Audio output transport node found before audio input node")
processors[node_id] = {"processor": daily_transport, "type": node["type"]}
elif node["type"] == "processors/llm":
llm_service = create_processor(node)
processors[node_id] = {"processor": llm_service, "type": node["type"]}
context = OpenAILLMContext(
[
{
"role": "system",
"content": "You are a helpful assistant. Your name is Housecat. You are participating in a voice conversation. Keep your answers brief. For punctuation use only period, comma, and question mark.",
},
{"role": "user", "content": "Introduce yourself."},
]
)
context_aggregator = llm_service.create_context_aggregator(context)
print(f"Context aggregator created: {context_aggregator}")
else:
processors[node_id] = {"processor": create_processor(node), "type": node["type"]}
# Create the pipeline based on the links
pipeline = []
for link in links:
source_id, _, _, target_id, _, _ = link
print(f"Processing link: {source_id} -> {target_id}")
if processors[source_id]["processor"] not in pipeline:
print(f"Adding source processor: {source_id}, {processors[source_id]['processor']}")
if processors[source_id]["type"] == "inputs/audio_input":
pipeline.append(processors[source_id]["processor"].input())
else:
pipeline.append(processors[source_id]["processor"])
if processors[target_id]["processor"] not in pipeline and target_id in processors:
print(f"Adding target processor: {target_id} {processors[target_id]['processor']}")
if processors[target_id]["type"] == "outputs/audio_output":
pipeline.append(processors[target_id]["processor"].output())
elif processors[target_id]["type"] == "processors/llm":
print("TRYING TO LINK AGGREGATOR")
if context_aggregator:
print("AGGREGATOR FOUND")
pipeline.append(context_aggregator.user())
pipeline.append(processors[target_id]["processor"])
else:
pipeline.append(processors[target_id]["processor"])
print(f"Pipeline created with {len(pipeline)} processors")
print(f"Pipeline: {pipeline}")
return pipeline, daily_transport
def translate_workflow(file_path: str) -> Tuple[List[FrameProcessor], BaseTransport]:
print(f"Translating workflow from file: {file_path}")
workflow = load_workflow(file_path)
pipeline, transport = create_pipeline(workflow)
print("Workflow translation completed")
return pipeline, transport