Demonstrates the WebSocket proxy tasks: a local `main.py` voice bot uses `WebSocketProxyClientTask` to forward bus messages (including `BusFrameMessage`s) to a remote `assistant.py` FastAPI server. Each incoming connection spawns a `WebSocketProxyServerTask` plus an `LLMTask` assistant on a per-session `PipelineRunner`.
119 lines
3.7 KiB
Python
119 lines
3.7 KiB
Python
#
|
|
# Copyright (c) 2026, Daily
|
|
#
|
|
# SPDX-License-Identifier: BSD 2-Clause License
|
|
#
|
|
|
|
"""Remote assistant LLM server.
|
|
|
|
Runs a FastAPI server that accepts WebSocket connections from a
|
|
``main.py``-style client. Each connection spins up a
|
|
`WebSocketProxyServerTask` bridging the socket to a local
|
|
`PipelineRunner` and an `LLMTask` that handles the conversation.
|
|
|
|
Usage::
|
|
|
|
python assistant.py
|
|
python assistant.py --port 9000
|
|
|
|
Requirements:
|
|
|
|
- OPENAI_API_KEY
|
|
"""
|
|
|
|
import argparse
|
|
import os
|
|
|
|
import uvicorn
|
|
from dotenv import load_dotenv
|
|
from fastapi import FastAPI, WebSocket
|
|
from loguru import logger
|
|
|
|
from pipecat.bus import BusFrameMessage
|
|
from pipecat.pipeline.runner import PipelineRunner
|
|
from pipecat.services.llm_service import FunctionCallParams
|
|
from pipecat.services.openai.llm import OpenAILLMService
|
|
from pipecat.tasks.llm import LLMTask, tool
|
|
from pipecat.tasks.proxy.websocket import WebSocketProxyServerTask
|
|
|
|
load_dotenv(override=True)
|
|
|
|
app = FastAPI()
|
|
|
|
|
|
class AcmeAssistant(LLMTask):
|
|
"""Handles greetings, product questions, and conversation end."""
|
|
|
|
def __init__(self):
|
|
"""Initialize the AcmeAssistant LLM task."""
|
|
llm = OpenAILLMService(
|
|
api_key=os.environ["OPENAI_API_KEY"],
|
|
settings=OpenAILLMService.Settings(
|
|
system_instruction=(
|
|
"You are a friendly assistant for Acme Corp. You know about three "
|
|
"products: Acme Rocket Boots (jet-powered boots, $299, run up to "
|
|
"60 mph), Acme Invisible Paint (makes anything invisible for 24 hours, "
|
|
"$49 per can), and Acme Tornado Kit (portable tornado generator, $199, "
|
|
"batteries included). Greet the user, help them with product questions, "
|
|
"and call end_conversation when the user says goodbye. "
|
|
"Keep responses brief, this is a voice conversation."
|
|
),
|
|
),
|
|
)
|
|
super().__init__("assistant", llm=llm, bridged=())
|
|
|
|
@tool
|
|
async def end_conversation(self, params: FunctionCallParams, reason: str):
|
|
"""End the conversation when the user says goodbye.
|
|
|
|
Args:
|
|
reason (str): Why the conversation is ending.
|
|
"""
|
|
logger.info(f"Task '{self.name}': ending conversation ({reason})")
|
|
await self.end(
|
|
reason=reason,
|
|
messages=[{"role": "developer", "content": reason}],
|
|
result_callback=params.result_callback,
|
|
)
|
|
|
|
|
|
@app.websocket("/ws")
|
|
async def websocket_endpoint(websocket: WebSocket):
|
|
"""Handle a WebSocket connection from the main bot's proxy."""
|
|
await websocket.accept()
|
|
|
|
runner = PipelineRunner(handle_sigint=False)
|
|
|
|
proxy = WebSocketProxyServerTask(
|
|
"gateway",
|
|
websocket=websocket,
|
|
task_name="assistant",
|
|
remote_task_name="acme",
|
|
forward_messages=(BusFrameMessage,),
|
|
)
|
|
|
|
@proxy.event_handler("on_client_connected")
|
|
async def on_client_connected(proxy, client):
|
|
logger.info("WebSocket client connected")
|
|
|
|
@proxy.event_handler("on_client_disconnected")
|
|
async def on_client_disconnected(proxy, client):
|
|
logger.info("WebSocket client disconnected")
|
|
await runner.cancel()
|
|
|
|
assistant = AcmeAssistant()
|
|
|
|
await runner.spawn(proxy)
|
|
logger.info("Assistant server ready, waiting for activation")
|
|
await runner.run(assistant)
|
|
logger.info("Assistant server session ended")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="Remote assistant LLM server")
|
|
parser.add_argument("--host", default="0.0.0.0", help="Host to bind to")
|
|
parser.add_argument("--port", type=int, default=8765, help="Port to listen on")
|
|
args = parser.parse_args()
|
|
|
|
uvicorn.run(app, host=args.host, port=args.port)
|