From 267c86e596398997a573bb7ca2b399ba549a68c8 Mon Sep 17 00:00:00 2001 From: mattie ruth backman Date: Wed, 4 Feb 2026 14:53:46 -0500 Subject: [PATCH] support RTVI files uploads larger than the transport can handle This PR introduces: 1. a new /files/ POST endpoint in the local runner that supports uploading a file to a folder that must be provided at runtime 2. By default, the runner will allow a maximum 10 files to be saved 3. Added logic to the send-file handler in RTVI to read a file from disk if the file provide is a url starting with '/files/' --- src/pipecat/frames/frames.py | 2 +- .../processors/frameworks/rtvi/models.py | 14 +- .../processors/frameworks/rtvi/processor.py | 52 ++++++-- src/pipecat/runner/run.py | 125 ++++++++++++++++-- .../transports/smallwebrtc/transport.py | 2 +- 5 files changed, 168 insertions(+), 27 deletions(-) diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index 6773eb9fc..ed6e83b6e 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -244,7 +244,7 @@ class ImageRawFrame: format: Optional[str] -FileSourceType = Literal["bytes", "url"] # TODO: Add support for "id" +FileSourceType = Literal["bytes", "url"] @dataclass diff --git a/src/pipecat/processors/frameworks/rtvi/models.py b/src/pipecat/processors/frameworks/rtvi/models.py index 95a97638f..4e9b458f9 100644 --- a/src/pipecat/processors/frameworks/rtvi/models.py +++ b/src/pipecat/processors/frameworks/rtvi/models.py @@ -258,7 +258,17 @@ class File(BaseModel): format: str # Mime format of the file, e.g., 'application/pdf' name: Optional[str] = None source: FileBytes | FileUrl - customOpts: Optional[dict] = None # ex. 'detail' in openAI or 'citations' in Bedrock + + +class SendFileOptions(BaseModel): + """Options for sending text input to the LLM. + + Contains options for how the pipeline should process the text input. + """ + + run_immediately: bool = True + audio_response: bool = True + custom_options: Optional[dict] = None # ex. 'detail' in openAI or 'citations' in Bedrock class SendFileData(BaseModel): @@ -269,7 +279,7 @@ class SendFileData(BaseModel): content: str # Text to accompany the file file: File - options: Optional[SendTextOptions] = None + options: Optional[SendFileOptions] = None class AppendToContextData(BaseModel): diff --git a/src/pipecat/processors/frameworks/rtvi/processor.py b/src/pipecat/processors/frameworks/rtvi/processor.py index d885b955a..0bff83708 100644 --- a/src/pipecat/processors/frameworks/rtvi/processor.py +++ b/src/pipecat/processors/frameworks/rtvi/processor.py @@ -8,6 +8,7 @@ import asyncio import base64 +import os from typing import Any, Dict, Mapping, Optional from loguru import logger @@ -72,6 +73,7 @@ class RTVIProcessor(FrameProcessor): *, config: Optional[RTVIConfig] = None, transport: Optional[BaseTransport] = None, + uploads_folder: Optional[str] = None, **kwargs, ): """Initialize the RTVI processor. @@ -79,10 +81,14 @@ class RTVIProcessor(FrameProcessor): Args: config: Initial RTVI configuration. transport: Transport layer for communication. + uploads_folder: Path to folder where client uploads (e.g. POST /files) are + stored; required for send-file with /files/ URLs. Use + runner_uploads_folder() when using the development runner. **kwargs: Additional arguments passed to parent class. """ super().__init__(**kwargs) self._config = config or RTVIConfig(config=[]) + self._folder = uploads_folder or "" self._bot_ready = False self._client_ready = False @@ -564,18 +570,43 @@ class RTVIProcessor(FrameProcessor): async def _handle_send_file(self, data: RTVI.SendFileData): """Handle a send-file message from the client.""" file = data.file - source = None - if file.source.type == "bytes": + type = file.source.type + opts = data.options if data.options is not None else RTVI.SendFileOptions() + + if type == "bytes": source = file.source.bytes - elif file.source.type == "url": - source = file.source.url + elif type == "url": + if file.source.url.startswith("/files/"): + if not self._folder: + logger.warning( + "Send-file with /files/ URL requires uploads_folder on RTVIProcessor " + "(e.g. uploads_folder=runner_uploads_folder())." + ) + return + # read bytes from file system, encode to base64, then delete the file + type = "bytes" + file_path = os.path.join(self._folder, file.source.url.removeprefix("/files/")) + with open(file_path, "rb") as f: + raw_bytes = f.read() + encoded_image = base64.b64encode(raw_bytes).decode("utf-8") + source = f"data:{file.format};base64,{encoded_image}" + try: + os.remove(file_path) + except OSError as e: + logger.warning(f"Failed to remove uploaded file {file_path}: {e}") + else: + source = file.source.url else: - logger.warning(f"Unsupported file source type: {file.source.type}") + logger.warning(f"Unsupported file source type: {type}") return - if file.source.type == "bytes" and file.format.startswith("image/"): - size = [file.source.width or 0, file.source.height or 0] + if type == "bytes" and file.format.startswith("image/"): + # Only access width/height if the original source is RTVIFileBytes (not RTVIFileUrl) + if file.source.type == "bytes": + size = [file.source.width or 0, file.source.height or 0] + else: + size = [0, 0] file_frame = UserImageRawFrame( text=data.content, image=source, @@ -587,14 +618,15 @@ class RTVIProcessor(FrameProcessor): file_frame = UserFileRawFrame( text=data.content, file=source, - type=file.source.type, + type=type, format=file.format, - custom_options=file.customOpts, + custom_options=opts.custom_options, append_to_context=True, ) - opts = data.options if data.options is not None else RTVI.SendTextOptions() + if opts.run_immediately: await self.interrupt_bot() + cur_llm_skip_tts = self._llm_skip_tts should_skip_tts = not opts.audio_response toggle_skip_tts = cur_llm_skip_tts != should_skip_tts diff --git a/src/pipecat/runner/run.py b/src/pipecat/runner/run.py index 8714fc338..40b8bc6f8 100644 --- a/src/pipecat/runner/run.py +++ b/src/pipecat/runner/run.py @@ -90,7 +90,16 @@ from pipecat.runner.types import ( try: import uvicorn from dotenv import load_dotenv - from fastapi import BackgroundTasks, FastAPI, Header, HTTPException, Request, WebSocket + from fastapi import ( + BackgroundTasks, + FastAPI, + File, + Header, + HTTPException, + Request, + UploadFile, + WebSocket, + ) from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import HTMLResponse, RedirectResponse except ImportError as e: @@ -107,6 +116,7 @@ os.environ["ENV"] = "local" TELEPHONY_TRANSPORTS = ["twilio", "telnyx", "plivo", "exotel"] RUNNER_DOWNLOADS_FOLDER: Optional[str] = None +RUNNER_UPLOADS_FOLDER: Optional[str] = None RUNNER_HOST: str = "localhost" RUNNER_PORT: int = 7860 @@ -164,6 +174,25 @@ async def _run_telephony_bot(websocket: WebSocket, args: argparse.Namespace): await bot_module.bot(runner_args) +def _trim_uploads_folder(folder: Path, max_files: int): + """Keep only the most recent max_files files in folder; delete oldest by mtime.""" + if max_files <= 0: + return + try: + files = [p for p in folder.iterdir() if p.is_file()] + if len(files) <= max_files: + return + by_mtime = sorted(files, key=lambda p: p.stat().st_mtime) + for p in by_mtime[: len(files) - max_files]: + try: + p.unlink() + logger.debug(f"Trimmed upload {p.name} from {folder}") + except OSError as e: + logger.warning(f"Failed to trim upload {p}: {e}") + except OSError as e: + logger.warning(f"Failed to list uploads folder {folder}: {e}") + + def _create_server_app(args: argparse.Namespace): """Create FastAPI app with transport-specific routes.""" app = FastAPI() @@ -230,18 +259,60 @@ def _setup_webrtc_routes(app: FastAPI, args: argparse.Namespace): @app.get("/files/{filename:path}") async def download_file(filename: str): - """Handle file downloads.""" - if not args.folder: - logger.warning(f"Attempting to dowload {filename}, but downloads folder not setup.") - return - - file_path = Path(args.folder) / filename - if not os.path.exists(file_path): + """Serve a file from uploads or downloads folder (uploads checked first).""" + # Sanitize: no path traversal + safe_name = Path(filename).name + if not safe_name or safe_name != filename: + raise HTTPException(404) + file_path = None + if args.uploads_folder: + candidate = Path(args.uploads_folder) / safe_name + if candidate.is_file(): + file_path = candidate + if file_path is None and args.downloads_folder: + candidate = Path(args.downloads_folder) / safe_name + if candidate.is_file(): + file_path = candidate + if file_path is None: + if not args.uploads_folder and not args.downloads_folder: + logger.warning( + f"Attempting to get {filename}, but no uploads or downloads folder set." + ) raise HTTPException(404) media_type, _ = mimetypes.guess_type(file_path) - return FileResponse(path=file_path, media_type=media_type, filename=filename) + return FileResponse(path=file_path, media_type=media_type, filename=safe_name) + + @app.post("/files") + async def upload_file(file: UploadFile = File(...)): + """Handle file uploads from clients. Requires --uploads-folder to be set.""" + if not args.uploads_folder: + raise HTTPException( + 503, + "File upload is disabled: start the runner with -u/--uploads-folder to set the uploads directory.", + ) + folder = Path(args.uploads_folder) + folder.mkdir(parents=True, exist_ok=True) + # Always save as UUID so uploads are not discoverable by guessing filenames + safe_name = uuid.uuid4().hex + file_path = folder / safe_name + try: + contents = await file.read() + file_path.write_bytes(contents) + logger.debug(f"Uploaded file to {file_path}") + except OSError as e: + logger.error(f"Failed to save upload: {e}") + raise HTTPException(500, "Failed to save file") from e + _trim_uploads_folder(folder, args.uploads_folder_max_files) + # Use original filename only for format/mime in response + original_name = Path(file.filename or "").name + media_type, _ = mimetypes.guess_type(original_name, strict=False) + return { + "name": original_name, + "source": f"/files/{safe_name}", + "format": media_type, + } # Initialize the SmallWebRTC request handler small_webrtc_handler: SmallWebRTCRequestHandler = SmallWebRTCRequestHandler( @@ -851,6 +922,11 @@ def runner_downloads_folder() -> Optional[str]: return RUNNER_DOWNLOADS_FOLDER +def runner_uploads_folder() -> Optional[str]: + """Returns the folder where client uploads are stored (short-lived).""" + return RUNNER_UPLOADS_FOLDER + + def runner_host() -> str: """Returns the host name of this runner.""" return RUNNER_HOST @@ -876,7 +952,9 @@ def main(parser: Optional[argparse.ArgumentParser] = None): - -t/--transport: Transport type (daily, webrtc, twilio, telnyx, plivo, exotel) - -x/--proxy: Public proxy hostname for telephony webhooks - -d/--direct: Connect directly to Daily room (automatically sets transport to daily) - - -f/--folder: Path to downloads folder + - -f/--downloads-folder: Path to folder for files available for download + - -u/--uploads-folder: Path to folder for client uploads (short-lived) + - --uploads-folder-max-files: Max files in uploads folder (default: 10) - --dialin: Enable Daily PSTN dial-in webhook handling (requires Daily transport) - --esp32: Enable SDP munging for ESP32 compatibility (requires --host with IP address) - --whatsapp: Ensure requried WhatsApp environment variables are present @@ -889,7 +967,7 @@ def main(parser: Optional[argparse.ArgumentParser] = None): ones. Custom args are accessible via `runner_args.cli_args`. """ - global RUNNER_DOWNLOADS_FOLDER, RUNNER_HOST, RUNNER_PORT + global RUNNER_DOWNLOADS_FOLDER, RUNNER_UPLOADS_FOLDER, RUNNER_HOST, RUNNER_PORT if not parser: parser = argparse.ArgumentParser(description="Pipecat Development Runner") @@ -911,7 +989,27 @@ def main(parser: Optional[argparse.ArgumentParser] = None): default=False, help="Connect directly to Daily room (automatically sets transport to daily)", ) - parser.add_argument("-f", "--folder", type=str, help="Path to downloads folder") + parser.add_argument( + "-f", + "--downloads-folder", + type=str, + dest="downloads_folder", + help="Path to folder for files available for download", + ) + parser.add_argument( + "-u", + "--uploads-folder", + type=str, + dest="uploads_folder", + help="Path to folder for client uploads (short-lived; max files enforced)", + ) + parser.add_argument( + "--uploads-folder-max-files", + type=int, + default=10, + dest="uploads_folder_max_files", + help="Max files to keep in uploads folder; oldest removed (default: 10)", + ) parser.add_argument( "-v", "--verbose", action="count", default=0, help="Increase logging verbosity" ) @@ -994,7 +1092,8 @@ def main(parser: Optional[argparse.ArgumentParser] = None): print(f" → Open http://{args.host}:{args.port} in your browser to start a session") print() - RUNNER_DOWNLOADS_FOLDER = args.folder + RUNNER_DOWNLOADS_FOLDER = args.downloads_folder + RUNNER_UPLOADS_FOLDER = args.uploads_folder RUNNER_HOST = args.host RUNNER_PORT = args.port diff --git a/src/pipecat/transports/smallwebrtc/transport.py b/src/pipecat/transports/smallwebrtc/transport.py index 36f883278..7b3094a4a 100644 --- a/src/pipecat/transports/smallwebrtc/transport.py +++ b/src/pipecat/transports/smallwebrtc/transport.py @@ -691,7 +691,7 @@ class SmallWebRTCInputTransport(BaseInputTransport): Args: message: The application message to process. """ - logger.debug(f"Received app message inside SmallWebRTCInputTransport {message}") + logger.trace(f"Received app message inside SmallWebRTCInputTransport {message}") await self.broadcast_frame(InputTransportMessageFrame, message=message) # Add this method similar to DailyInputTransport.request_participant_image