From 4fbeb5fbcbf240e845fc8b96cc96cf1b72eb74d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Wed, 13 May 2026 23:26:31 -0700 Subject: [PATCH] Add remote-proxy-assistant example Demonstrates the WebSocket proxy tasks: a local `main.py` voice bot uses `WebSocketProxyClientTask` to forward bus messages (including `BusFrameMessage`s) to a remote `assistant.py` FastAPI server. Each incoming connection spawns a `WebSocketProxyServerTask` plus an `LLMTask` assistant on a per-session `PipelineRunner`. --- .../remote-proxy-assistant/assistant.py | 118 ++++++++++++ .../multi-task/remote-proxy-assistant/main.py | 169 ++++++++++++++++++ 2 files changed, 287 insertions(+) create mode 100644 examples/multi-task/remote-proxy-assistant/assistant.py create mode 100644 examples/multi-task/remote-proxy-assistant/main.py diff --git a/examples/multi-task/remote-proxy-assistant/assistant.py b/examples/multi-task/remote-proxy-assistant/assistant.py new file mode 100644 index 000000000..b7e728e39 --- /dev/null +++ b/examples/multi-task/remote-proxy-assistant/assistant.py @@ -0,0 +1,118 @@ +# +# Copyright (c) 2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""Remote assistant LLM server. + +Runs a FastAPI server that accepts WebSocket connections from a +``main.py``-style client. Each connection spins up a +`WebSocketProxyServerTask` bridging the socket to a local +`PipelineRunner` and an `LLMTask` that handles the conversation. + +Usage:: + + python assistant.py + python assistant.py --port 9000 + +Requirements: + +- OPENAI_API_KEY +""" + +import argparse +import os + +import uvicorn +from dotenv import load_dotenv +from fastapi import FastAPI, WebSocket +from loguru import logger + +from pipecat.bus import BusFrameMessage +from pipecat.pipeline.runner import PipelineRunner +from pipecat.services.llm_service import FunctionCallParams +from pipecat.services.openai.llm import OpenAILLMService +from pipecat.tasks.llm import LLMTask, tool +from pipecat.tasks.proxy.websocket import WebSocketProxyServerTask + +load_dotenv(override=True) + +app = FastAPI() + + +class AcmeAssistant(LLMTask): + """Handles greetings, product questions, and conversation end.""" + + def __init__(self): + """Initialize the AcmeAssistant LLM task.""" + llm = OpenAILLMService( + api_key=os.environ["OPENAI_API_KEY"], + settings=OpenAILLMService.Settings( + system_instruction=( + "You are a friendly assistant for Acme Corp. You know about three " + "products: Acme Rocket Boots (jet-powered boots, $299, run up to " + "60 mph), Acme Invisible Paint (makes anything invisible for 24 hours, " + "$49 per can), and Acme Tornado Kit (portable tornado generator, $199, " + "batteries included). Greet the user, help them with product questions, " + "and call end_conversation when the user says goodbye. " + "Keep responses brief, this is a voice conversation." + ), + ), + ) + super().__init__("assistant", llm=llm, bridged=()) + + @tool + async def end_conversation(self, params: FunctionCallParams, reason: str): + """End the conversation when the user says goodbye. + + Args: + reason (str): Why the conversation is ending. + """ + logger.info(f"Task '{self.name}': ending conversation ({reason})") + await self.end( + reason=reason, + messages=[{"role": "developer", "content": reason}], + result_callback=params.result_callback, + ) + + +@app.websocket("/ws") +async def websocket_endpoint(websocket: WebSocket): + """Handle a WebSocket connection from the main bot's proxy.""" + await websocket.accept() + + runner = PipelineRunner(handle_sigint=False) + + proxy = WebSocketProxyServerTask( + "gateway", + websocket=websocket, + task_name="assistant", + remote_task_name="acme", + forward_messages=(BusFrameMessage,), + ) + + @proxy.event_handler("on_client_connected") + async def on_client_connected(proxy, client): + logger.info("WebSocket client connected") + + @proxy.event_handler("on_client_disconnected") + async def on_client_disconnected(proxy, client): + logger.info("WebSocket client disconnected") + await runner.cancel() + + assistant = AcmeAssistant() + + await runner.spawn(proxy) + logger.info("Assistant server ready, waiting for activation") + await runner.run(assistant) + logger.info("Assistant server session ended") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Remote assistant LLM server") + parser.add_argument("--host", default="0.0.0.0", help="Host to bind to") + parser.add_argument("--port", type=int, default=8765, help="Port to listen on") + args = parser.parse_args() + + uvicorn.run(app, host=args.host, port=args.port) diff --git a/examples/multi-task/remote-proxy-assistant/main.py b/examples/multi-task/remote-proxy-assistant/main.py new file mode 100644 index 000000000..57afef6b5 --- /dev/null +++ b/examples/multi-task/remote-proxy-assistant/main.py @@ -0,0 +1,169 @@ +# +# Copyright (c) 2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""Main transport task with a WebSocket proxy to a remote LLM server. + +Handles audio I/O (STT, TTS) and bridges frames to the bus. A +`WebSocketProxyClientTask` forwards bus messages to a remote LLM +server (see ``assistant.py``) over WebSocket. + +Usage:: + + python main.py --remote-url ws://localhost:8765/ws + +Requirements: + +- DEEPGRAM_API_KEY +- CARTESIA_API_KEY +""" + +import argparse +import os + +from dotenv import load_dotenv +from loguru import logger + +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.bus import BusBridgeProcessor, BusFrameMessage +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.llm_context import LLMContext +from pipecat.processors.aggregators.llm_response_universal import ( + LLMContextAggregatorPair, + LLMUserAggregatorParams, +) +from pipecat.registry.types import TaskReadyData +from pipecat.runner.types import RunnerArguments +from pipecat.runner.utils import create_transport +from pipecat.services.cartesia.tts import CartesiaTTSService +from pipecat.services.deepgram.stt import DeepgramSTTService +from pipecat.tasks.llm import LLMTaskActivationArgs +from pipecat.tasks.proxy.websocket import WebSocketProxyClientTask +from pipecat.transports.base_transport import BaseTransport, TransportParams +from pipecat.transports.daily.transport import DailyParams + +load_dotenv(override=True) + +MAIN_NAME = "acme" + +transport_params = { + "daily": lambda: DailyParams( + audio_in_enabled=True, + audio_out_enabled=True, + ), + "webrtc": lambda: TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + ), +} + + +async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): + runner = PipelineRunner(handle_sigint=runner_args.handle_sigint) + + stt = DeepgramSTTService(api_key=os.environ["DEEPGRAM_API_KEY"]) + tts = CartesiaTTSService( + api_key=os.environ["CARTESIA_API_KEY"], + settings=CartesiaTTSService.Settings( + voice="9626c31c-bec5-4cca-baa8-f8ba9e84c8bc", # Jacqueline + ), + ) + + context = LLMContext() + aggregators = LLMContextAggregatorPair( + context, + user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()), + ) + + bridge = BusBridgeProcessor( + bus=runner.bus, + task_name=MAIN_NAME, + name=f"{MAIN_NAME}::BusBridge", + ) + + pipeline = Pipeline( + [ + transport.input(), + stt, + aggregators.user(), + bridge, + tts, + transport.output(), + aggregators.assistant(), + ] + ) + + task = PipelineTask( + pipeline, + name=MAIN_NAME, + params=PipelineParams( + enable_metrics=True, + enable_usage_metrics=True, + ), + idle_timeout_secs=runner_args.pipeline_idle_timeout_secs, + ) + + # Forward bus frame messages over the WebSocket so the remote + # assistant sees user-side context and can ship back its replies. + proxy = WebSocketProxyClientTask( + "proxy", + url=runner_args.cli_args.remote_url, + local_task_name=MAIN_NAME, + remote_task_name="assistant", + forward_messages=(BusFrameMessage,), + ) + await runner.spawn(proxy) + + async def on_assistant_ready(_data: TaskReadyData) -> None: + logger.info("Remote assistant ready, activating") + await task.activate_task( + "assistant", + args=LLMTaskActivationArgs( + messages=[ + { + "role": "developer", + "content": ( + "Welcome the user to Acme Corp, mention the available " + "products and ask how you can help." + ), + }, + ], + ), + ) + + await runner.registry.watch("assistant", on_assistant_ready) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info("Client connected, activating proxy") + await task.activate_task("proxy") + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info("Client disconnected") + await task.cancel() + + await runner.run(task) + + +async def bot(runner_args: RunnerArguments): + """Main bot entry point compatible with Pipecat Cloud.""" + transport = await create_transport(runner_args, transport_params) + await run_bot(transport, runner_args) + + +if __name__ == "__main__": + from pipecat.runner.run import main + + parser = argparse.ArgumentParser(description="Main transport task with WebSocket proxy") + parser.add_argument( + "--remote-url", + default="ws://localhost:8765/ws", + help="WebSocket URL of the remote LLM server", + ) + + main(parser)