Add remote-proxy-assistant example

Demonstrates the WebSocket proxy tasks: a local `main.py` voice
bot uses `WebSocketProxyClientTask` to forward bus messages
(including `BusFrameMessage`s) to a remote `assistant.py`
FastAPI server. Each incoming connection spawns a
`WebSocketProxyServerTask` plus an `LLMTask` assistant on a
per-session `PipelineRunner`.
This commit is contained in:
Aleix Conchillo Flaqué
2026-05-13 23:26:31 -07:00
parent 4509caa724
commit 4fbeb5fbcb
2 changed files with 287 additions and 0 deletions

View File

@@ -0,0 +1,118 @@
#
# Copyright (c) 2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Remote assistant LLM server.
Runs a FastAPI server that accepts WebSocket connections from a
``main.py``-style client. Each connection spins up a
`WebSocketProxyServerTask` bridging the socket to a local
`PipelineRunner` and an `LLMTask` that handles the conversation.
Usage::
python assistant.py
python assistant.py --port 9000
Requirements:
- OPENAI_API_KEY
"""
import argparse
import os
import uvicorn
from dotenv import load_dotenv
from fastapi import FastAPI, WebSocket
from loguru import logger
from pipecat.bus import BusFrameMessage
from pipecat.pipeline.runner import PipelineRunner
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.tasks.llm import LLMTask, tool
from pipecat.tasks.proxy.websocket import WebSocketProxyServerTask
load_dotenv(override=True)
app = FastAPI()
class AcmeAssistant(LLMTask):
"""Handles greetings, product questions, and conversation end."""
def __init__(self):
"""Initialize the AcmeAssistant LLM task."""
llm = OpenAILLMService(
api_key=os.environ["OPENAI_API_KEY"],
settings=OpenAILLMService.Settings(
system_instruction=(
"You are a friendly assistant for Acme Corp. You know about three "
"products: Acme Rocket Boots (jet-powered boots, $299, run up to "
"60 mph), Acme Invisible Paint (makes anything invisible for 24 hours, "
"$49 per can), and Acme Tornado Kit (portable tornado generator, $199, "
"batteries included). Greet the user, help them with product questions, "
"and call end_conversation when the user says goodbye. "
"Keep responses brief, this is a voice conversation."
),
),
)
super().__init__("assistant", llm=llm, bridged=())
@tool
async def end_conversation(self, params: FunctionCallParams, reason: str):
"""End the conversation when the user says goodbye.
Args:
reason (str): Why the conversation is ending.
"""
logger.info(f"Task '{self.name}': ending conversation ({reason})")
await self.end(
reason=reason,
messages=[{"role": "developer", "content": reason}],
result_callback=params.result_callback,
)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""Handle a WebSocket connection from the main bot's proxy."""
await websocket.accept()
runner = PipelineRunner(handle_sigint=False)
proxy = WebSocketProxyServerTask(
"gateway",
websocket=websocket,
task_name="assistant",
remote_task_name="acme",
forward_messages=(BusFrameMessage,),
)
@proxy.event_handler("on_client_connected")
async def on_client_connected(proxy, client):
logger.info("WebSocket client connected")
@proxy.event_handler("on_client_disconnected")
async def on_client_disconnected(proxy, client):
logger.info("WebSocket client disconnected")
await runner.cancel()
assistant = AcmeAssistant()
await runner.spawn(proxy)
logger.info("Assistant server ready, waiting for activation")
await runner.run(assistant)
logger.info("Assistant server session ended")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Remote assistant LLM server")
parser.add_argument("--host", default="0.0.0.0", help="Host to bind to")
parser.add_argument("--port", type=int, default=8765, help="Port to listen on")
args = parser.parse_args()
uvicorn.run(app, host=args.host, port=args.port)

View File

@@ -0,0 +1,169 @@
#
# Copyright (c) 2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Main transport task with a WebSocket proxy to a remote LLM server.
Handles audio I/O (STT, TTS) and bridges frames to the bus. A
`WebSocketProxyClientTask` forwards bus messages to a remote LLM
server (see ``assistant.py``) over WebSocket.
Usage::
python main.py --remote-url ws://localhost:8765/ws
Requirements:
- DEEPGRAM_API_KEY
- CARTESIA_API_KEY
"""
import argparse
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.bus import BusBridgeProcessor, BusFrameMessage
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.registry.types import TaskReadyData
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.tasks.llm import LLMTaskActivationArgs
from pipecat.tasks.proxy.websocket import WebSocketProxyClientTask
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
load_dotenv(override=True)
MAIN_NAME = "acme"
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
stt = DeepgramSTTService(api_key=os.environ["DEEPGRAM_API_KEY"])
tts = CartesiaTTSService(
api_key=os.environ["CARTESIA_API_KEY"],
settings=CartesiaTTSService.Settings(
voice="9626c31c-bec5-4cca-baa8-f8ba9e84c8bc", # Jacqueline
),
)
context = LLMContext()
aggregators = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
bridge = BusBridgeProcessor(
bus=runner.bus,
task_name=MAIN_NAME,
name=f"{MAIN_NAME}::BusBridge",
)
pipeline = Pipeline(
[
transport.input(),
stt,
aggregators.user(),
bridge,
tts,
transport.output(),
aggregators.assistant(),
]
)
task = PipelineTask(
pipeline,
name=MAIN_NAME,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
# Forward bus frame messages over the WebSocket so the remote
# assistant sees user-side context and can ship back its replies.
proxy = WebSocketProxyClientTask(
"proxy",
url=runner_args.cli_args.remote_url,
local_task_name=MAIN_NAME,
remote_task_name="assistant",
forward_messages=(BusFrameMessage,),
)
await runner.spawn(proxy)
async def on_assistant_ready(_data: TaskReadyData) -> None:
logger.info("Remote assistant ready, activating")
await task.activate_task(
"assistant",
args=LLMTaskActivationArgs(
messages=[
{
"role": "developer",
"content": (
"Welcome the user to Acme Corp, mention the available "
"products and ask how you can help."
),
},
],
),
)
await runner.registry.watch("assistant", on_assistant_ready)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info("Client connected, activating proxy")
await task.activate_task("proxy")
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info("Client disconnected")
await task.cancel()
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
parser = argparse.ArgumentParser(description="Main transport task with WebSocket proxy")
parser.add_argument(
"--remote-url",
default="ws://localhost:8765/ws",
help="WebSocket URL of the remote LLM server",
)
main(parser)