Compare commits
2 Commits
hush/usage
...
khk/load-j
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b20687e32a | ||
|
|
388b3a239b |
1
src/pipecat/workflow/.gitignore
vendored
Normal file
1
src/pipecat/workflow/.gitignore
vendored
Normal file
@@ -0,0 +1 @@
|
|||||||
|
*.json
|
||||||
1
src/pipecat/workflow/README.md
Normal file
1
src/pipecat/workflow/README.md
Normal file
@@ -0,0 +1 @@
|
|||||||
|
python -m pipecat.workflow.workflow_test to run
|
||||||
0
src/pipecat/workflow/__init__.py
Normal file
0
src/pipecat/workflow/__init__.py
Normal file
18
src/pipecat/workflow/workflow_mapping.py
Normal file
18
src/pipecat/workflow/workflow_mapping.py
Normal file
@@ -0,0 +1,18 @@
|
|||||||
|
from ..services.cartesia import CartesiaTTSService
|
||||||
|
from ..services.openai import OpenAILLMService
|
||||||
|
from ..services.deepgram import DeepgramSTTService
|
||||||
|
from ..transports.services.daily import DailyTransport
|
||||||
|
from ..processors.frame_processor import FrameProcessor
|
||||||
|
|
||||||
|
# Map workflow types to their corresponding Python classes
|
||||||
|
WORKFLOW_MAPPING = {
|
||||||
|
"inputs/audio_input": DailyTransport,
|
||||||
|
"processors/speech_to_text": DeepgramSTTService,
|
||||||
|
"processors/llm": OpenAILLMService,
|
||||||
|
"processors/text_to_speech": CartesiaTTSService,
|
||||||
|
"outputs/audio_output": DailyTransport,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def get_processor_class(node_type: str) -> type[FrameProcessor]:
|
||||||
|
return WORKFLOW_MAPPING.get(node_type, FrameProcessor)
|
||||||
65
src/pipecat/workflow/workflow_test.py
Normal file
65
src/pipecat/workflow/workflow_test.py
Normal file
@@ -0,0 +1,65 @@
|
|||||||
|
import asyncio
|
||||||
|
import os
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
from ..pipeline.pipeline import Pipeline
|
||||||
|
from ..pipeline.runner import PipelineRunner
|
||||||
|
from ..pipeline.task import PipelineTask, PipelineParams
|
||||||
|
from .workflow_translator import translate_workflow
|
||||||
|
from ..services.openai import OpenAIUserContextAggregator
|
||||||
|
|
||||||
|
|
||||||
|
load_dotenv(override=True)
|
||||||
|
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
print("Starting workflow test")
|
||||||
|
|
||||||
|
# Update the path to the workflow.json file
|
||||||
|
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||||||
|
workflow_path = os.path.join(script_dir, "workflow.json")
|
||||||
|
print(f"Workflow path: {workflow_path}")
|
||||||
|
|
||||||
|
# Translate the workflow to a list of processors
|
||||||
|
print("Translating workflow to processors")
|
||||||
|
processors, daily_transport = translate_workflow(workflow_path)
|
||||||
|
print(f"Processors created: {processors}")
|
||||||
|
|
||||||
|
# Create a pipeline from the processors
|
||||||
|
print("Creating pipeline")
|
||||||
|
pipeline = Pipeline(processors)
|
||||||
|
print(f"Pipeline created: {pipeline}")
|
||||||
|
|
||||||
|
# Create a pipeline task
|
||||||
|
print("Creating pipeline task")
|
||||||
|
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
|
||||||
|
print(f"Pipeline task created: {task}")
|
||||||
|
|
||||||
|
# Create a pipeline runner
|
||||||
|
print("Creating pipeline runner")
|
||||||
|
runner = PipelineRunner()
|
||||||
|
print(f"Pipeline runner created: {runner}")
|
||||||
|
|
||||||
|
user_context_aggregator = next(
|
||||||
|
p for p in processors if isinstance(p, OpenAIUserContextAggregator)
|
||||||
|
)
|
||||||
|
|
||||||
|
@daily_transport.event_handler("on_first_participant_joined")
|
||||||
|
async def on_first_participant_joined(transport, participant):
|
||||||
|
transport.capture_participant_transcription(participant["id"])
|
||||||
|
await task.queue_frames([user_context_aggregator.get_context_frame()])
|
||||||
|
|
||||||
|
# Run the pipeline
|
||||||
|
print("Running the pipeline")
|
||||||
|
try:
|
||||||
|
await runner.run(task)
|
||||||
|
print("Pipeline execution completed successfully")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error during pipeline execution: {e}")
|
||||||
|
|
||||||
|
print("Workflow test completed")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
print("Starting main execution")
|
||||||
|
asyncio.run(main())
|
||||||
|
print("Main execution completed")
|
||||||
140
src/pipecat/workflow/workflow_translator.py
Normal file
140
src/pipecat/workflow/workflow_translator.py
Normal file
@@ -0,0 +1,140 @@
|
|||||||
|
import json
|
||||||
|
import os
|
||||||
|
|
||||||
|
from typing import Any, Dict, List, Tuple
|
||||||
|
from .workflow_mapping import get_processor_class
|
||||||
|
from ..processors.frame_processor import FrameProcessor
|
||||||
|
from ..transports.services.daily import DailyParams
|
||||||
|
from ..processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||||
|
from ..audio.vad.silero import SileroVADAnalyzer
|
||||||
|
from ..transports.base_transport import BaseTransport
|
||||||
|
|
||||||
|
|
||||||
|
def load_workflow(file_path: str) -> Dict[str, Any]:
|
||||||
|
print(f"Loading workflow from file: {file_path}")
|
||||||
|
try:
|
||||||
|
with open(file_path, "r") as f:
|
||||||
|
workflow = json.load(f)
|
||||||
|
print(f"Workflow loaded successfully: {workflow}")
|
||||||
|
return workflow
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error loading workflow: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
def create_processor(node: Dict[str, Any], next_node: Dict[str, Any] = None) -> FrameProcessor:
|
||||||
|
print(f"Creating processor for node: {node['id']} of type: {node['type']}")
|
||||||
|
processor_class = get_processor_class(node["type"])
|
||||||
|
print(f"Processor class: {processor_class}")
|
||||||
|
|
||||||
|
# Extract relevant properties for initialization
|
||||||
|
init_params = {}
|
||||||
|
if node["type"] == "inputs/audio_input":
|
||||||
|
init_params = {
|
||||||
|
"room_url": os.getenv("DAILY_SAMPLE_ROOM_URL"),
|
||||||
|
"token": "",
|
||||||
|
"bot_name": "PipecatBot",
|
||||||
|
"params": DailyParams(
|
||||||
|
audio_out_enabled=True,
|
||||||
|
vad_enabled=True,
|
||||||
|
vad_audio_passthrough=True,
|
||||||
|
vad_analyzer=SileroVADAnalyzer(),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
elif node["type"] == "processors/speech_to_text":
|
||||||
|
init_params = {
|
||||||
|
"api_key": os.getenv("DEEPGRAM_API_KEY"),
|
||||||
|
}
|
||||||
|
elif node["type"] == "processors/text_to_speech":
|
||||||
|
init_params = {
|
||||||
|
"api_key": os.getenv("CARTESIA_API_KEY"),
|
||||||
|
"voice_id": "79a125e8-cd45-4c13-8a67-188112f4dd22",
|
||||||
|
}
|
||||||
|
|
||||||
|
print(f"Initialization parameters: {init_params}")
|
||||||
|
processor = processor_class(**init_params)
|
||||||
|
print(f"Processor created: {processor}")
|
||||||
|
|
||||||
|
return processor
|
||||||
|
|
||||||
|
|
||||||
|
def create_pipeline(workflow: Dict[str, Any]) -> Tuple[List[FrameProcessor], BaseTransport]:
|
||||||
|
print("Creating pipeline from workflow")
|
||||||
|
nodes = {node["id"]: node for node in workflow["nodes"]}
|
||||||
|
links = workflow["links"]
|
||||||
|
|
||||||
|
print(f"Nodes: {nodes}")
|
||||||
|
print(f"Links: {links}")
|
||||||
|
|
||||||
|
# Create a dictionary to store processors
|
||||||
|
processors = {}
|
||||||
|
daily_transport = None
|
||||||
|
llm_service = None
|
||||||
|
context_aggregator = None
|
||||||
|
|
||||||
|
# Create processors for each node
|
||||||
|
for node_id, node in nodes.items():
|
||||||
|
print(f"Creating processor for node: {node_id}")
|
||||||
|
|
||||||
|
if node["type"] == "inputs/audio_input":
|
||||||
|
daily_transport = create_processor(node)
|
||||||
|
processors[node_id] = {"processor": daily_transport, "type": node["type"]}
|
||||||
|
elif node["type"] == "outputs/audio_output":
|
||||||
|
if daily_transport is None:
|
||||||
|
raise ValueError("Audio output transport node found before audio input node")
|
||||||
|
processors[node_id] = {"processor": daily_transport, "type": node["type"]}
|
||||||
|
elif node["type"] == "processors/llm":
|
||||||
|
llm_service = create_processor(node)
|
||||||
|
processors[node_id] = {"processor": llm_service, "type": node["type"]}
|
||||||
|
context = OpenAILLMContext(
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"role": "system",
|
||||||
|
"content": "You are a helpful assistant. Your name is Housecat. You are participating in a voice conversation. Keep your answers brief. For punctuation use only period, comma, and question mark.",
|
||||||
|
},
|
||||||
|
{"role": "user", "content": "Introduce yourself."},
|
||||||
|
]
|
||||||
|
)
|
||||||
|
context_aggregator = llm_service.create_context_aggregator(context)
|
||||||
|
print(f"Context aggregator created: {context_aggregator}")
|
||||||
|
else:
|
||||||
|
processors[node_id] = {"processor": create_processor(node), "type": node["type"]}
|
||||||
|
|
||||||
|
# Create the pipeline based on the links
|
||||||
|
pipeline = []
|
||||||
|
for link in links:
|
||||||
|
source_id, _, _, target_id, _, _ = link
|
||||||
|
print(f"Processing link: {source_id} -> {target_id}")
|
||||||
|
|
||||||
|
if processors[source_id]["processor"] not in pipeline:
|
||||||
|
print(f"Adding source processor: {source_id}, {processors[source_id]['processor']}")
|
||||||
|
if processors[source_id]["type"] == "inputs/audio_input":
|
||||||
|
pipeline.append(processors[source_id]["processor"].input())
|
||||||
|
else:
|
||||||
|
pipeline.append(processors[source_id]["processor"])
|
||||||
|
|
||||||
|
if processors[target_id]["processor"] not in pipeline and target_id in processors:
|
||||||
|
print(f"Adding target processor: {target_id} {processors[target_id]['processor']}")
|
||||||
|
if processors[target_id]["type"] == "outputs/audio_output":
|
||||||
|
pipeline.append(processors[target_id]["processor"].output())
|
||||||
|
elif processors[target_id]["type"] == "processors/llm":
|
||||||
|
print("TRYING TO LINK AGGREGATOR")
|
||||||
|
if context_aggregator:
|
||||||
|
print("AGGREGATOR FOUND")
|
||||||
|
pipeline.append(context_aggregator.user())
|
||||||
|
pipeline.append(processors[target_id]["processor"])
|
||||||
|
else:
|
||||||
|
pipeline.append(processors[target_id]["processor"])
|
||||||
|
|
||||||
|
print(f"Pipeline created with {len(pipeline)} processors")
|
||||||
|
print(f"Pipeline: {pipeline}")
|
||||||
|
|
||||||
|
return pipeline, daily_transport
|
||||||
|
|
||||||
|
|
||||||
|
def translate_workflow(file_path: str) -> Tuple[List[FrameProcessor], BaseTransport]:
|
||||||
|
print(f"Translating workflow from file: {file_path}")
|
||||||
|
workflow = load_workflow(file_path)
|
||||||
|
pipeline, transport = create_pipeline(workflow)
|
||||||
|
print("Workflow translation completed")
|
||||||
|
return pipeline, transport
|
||||||
Reference in New Issue
Block a user