support RTVI files uploads larger than the transport can handle

This PR introduces:
1. a new /files/ POST endpoint in the local runner that supports
   uploading a file to a folder that must be provided at runtime
2. By default, the runner will allow a maximum 10 files to be
   saved
3. Added logic to the send-file handler in RTVI to read a file
   from disk if the file provide is a url starting with '/files/'
This commit is contained in:
mattie ruth backman
2026-02-04 14:53:46 -05:00
parent 9fb06c3e4b
commit 267c86e596
5 changed files with 168 additions and 27 deletions

View File

@@ -244,7 +244,7 @@ class ImageRawFrame:
format: Optional[str]
FileSourceType = Literal["bytes", "url"] # TODO: Add support for "id"
FileSourceType = Literal["bytes", "url"]
@dataclass

View File

@@ -258,7 +258,17 @@ class File(BaseModel):
format: str # Mime format of the file, e.g., 'application/pdf'
name: Optional[str] = None
source: FileBytes | FileUrl
customOpts: Optional[dict] = None # ex. 'detail' in openAI or 'citations' in Bedrock
class SendFileOptions(BaseModel):
"""Options for sending text input to the LLM.
Contains options for how the pipeline should process the text input.
"""
run_immediately: bool = True
audio_response: bool = True
custom_options: Optional[dict] = None # ex. 'detail' in openAI or 'citations' in Bedrock
class SendFileData(BaseModel):
@@ -269,7 +279,7 @@ class SendFileData(BaseModel):
content: str # Text to accompany the file
file: File
options: Optional[SendTextOptions] = None
options: Optional[SendFileOptions] = None
class AppendToContextData(BaseModel):

View File

@@ -8,6 +8,7 @@
import asyncio
import base64
import os
from typing import Any, Dict, Mapping, Optional
from loguru import logger
@@ -72,6 +73,7 @@ class RTVIProcessor(FrameProcessor):
*,
config: Optional[RTVIConfig] = None,
transport: Optional[BaseTransport] = None,
uploads_folder: Optional[str] = None,
**kwargs,
):
"""Initialize the RTVI processor.
@@ -79,10 +81,14 @@ class RTVIProcessor(FrameProcessor):
Args:
config: Initial RTVI configuration.
transport: Transport layer for communication.
uploads_folder: Path to folder where client uploads (e.g. POST /files) are
stored; required for send-file with /files/ URLs. Use
runner_uploads_folder() when using the development runner.
**kwargs: Additional arguments passed to parent class.
"""
super().__init__(**kwargs)
self._config = config or RTVIConfig(config=[])
self._folder = uploads_folder or ""
self._bot_ready = False
self._client_ready = False
@@ -564,18 +570,43 @@ class RTVIProcessor(FrameProcessor):
async def _handle_send_file(self, data: RTVI.SendFileData):
"""Handle a send-file message from the client."""
file = data.file
source = None
if file.source.type == "bytes":
type = file.source.type
opts = data.options if data.options is not None else RTVI.SendFileOptions()
if type == "bytes":
source = file.source.bytes
elif file.source.type == "url":
source = file.source.url
elif type == "url":
if file.source.url.startswith("/files/"):
if not self._folder:
logger.warning(
"Send-file with /files/ URL requires uploads_folder on RTVIProcessor "
"(e.g. uploads_folder=runner_uploads_folder())."
)
return
# read bytes from file system, encode to base64, then delete the file
type = "bytes"
file_path = os.path.join(self._folder, file.source.url.removeprefix("/files/"))
with open(file_path, "rb") as f:
raw_bytes = f.read()
encoded_image = base64.b64encode(raw_bytes).decode("utf-8")
source = f"data:{file.format};base64,{encoded_image}"
try:
os.remove(file_path)
except OSError as e:
logger.warning(f"Failed to remove uploaded file {file_path}: {e}")
else:
source = file.source.url
else:
logger.warning(f"Unsupported file source type: {file.source.type}")
logger.warning(f"Unsupported file source type: {type}")
return
if file.source.type == "bytes" and file.format.startswith("image/"):
size = [file.source.width or 0, file.source.height or 0]
if type == "bytes" and file.format.startswith("image/"):
# Only access width/height if the original source is RTVIFileBytes (not RTVIFileUrl)
if file.source.type == "bytes":
size = [file.source.width or 0, file.source.height or 0]
else:
size = [0, 0]
file_frame = UserImageRawFrame(
text=data.content,
image=source,
@@ -587,14 +618,15 @@ class RTVIProcessor(FrameProcessor):
file_frame = UserFileRawFrame(
text=data.content,
file=source,
type=file.source.type,
type=type,
format=file.format,
custom_options=file.customOpts,
custom_options=opts.custom_options,
append_to_context=True,
)
opts = data.options if data.options is not None else RTVI.SendTextOptions()
if opts.run_immediately:
await self.interrupt_bot()
cur_llm_skip_tts = self._llm_skip_tts
should_skip_tts = not opts.audio_response
toggle_skip_tts = cur_llm_skip_tts != should_skip_tts

View File

@@ -90,7 +90,16 @@ from pipecat.runner.types import (
try:
import uvicorn
from dotenv import load_dotenv
from fastapi import BackgroundTasks, FastAPI, Header, HTTPException, Request, WebSocket
from fastapi import (
BackgroundTasks,
FastAPI,
File,
Header,
HTTPException,
Request,
UploadFile,
WebSocket,
)
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse, RedirectResponse
except ImportError as e:
@@ -107,6 +116,7 @@ os.environ["ENV"] = "local"
TELEPHONY_TRANSPORTS = ["twilio", "telnyx", "plivo", "exotel"]
RUNNER_DOWNLOADS_FOLDER: Optional[str] = None
RUNNER_UPLOADS_FOLDER: Optional[str] = None
RUNNER_HOST: str = "localhost"
RUNNER_PORT: int = 7860
@@ -164,6 +174,25 @@ async def _run_telephony_bot(websocket: WebSocket, args: argparse.Namespace):
await bot_module.bot(runner_args)
def _trim_uploads_folder(folder: Path, max_files: int):
"""Keep only the most recent max_files files in folder; delete oldest by mtime."""
if max_files <= 0:
return
try:
files = [p for p in folder.iterdir() if p.is_file()]
if len(files) <= max_files:
return
by_mtime = sorted(files, key=lambda p: p.stat().st_mtime)
for p in by_mtime[: len(files) - max_files]:
try:
p.unlink()
logger.debug(f"Trimmed upload {p.name} from {folder}")
except OSError as e:
logger.warning(f"Failed to trim upload {p}: {e}")
except OSError as e:
logger.warning(f"Failed to list uploads folder {folder}: {e}")
def _create_server_app(args: argparse.Namespace):
"""Create FastAPI app with transport-specific routes."""
app = FastAPI()
@@ -230,18 +259,60 @@ def _setup_webrtc_routes(app: FastAPI, args: argparse.Namespace):
@app.get("/files/{filename:path}")
async def download_file(filename: str):
"""Handle file downloads."""
if not args.folder:
logger.warning(f"Attempting to dowload {filename}, but downloads folder not setup.")
return
file_path = Path(args.folder) / filename
if not os.path.exists(file_path):
"""Serve a file from uploads or downloads folder (uploads checked first)."""
# Sanitize: no path traversal
safe_name = Path(filename).name
if not safe_name or safe_name != filename:
raise HTTPException(404)
file_path = None
if args.uploads_folder:
candidate = Path(args.uploads_folder) / safe_name
if candidate.is_file():
file_path = candidate
if file_path is None and args.downloads_folder:
candidate = Path(args.downloads_folder) / safe_name
if candidate.is_file():
file_path = candidate
if file_path is None:
if not args.uploads_folder and not args.downloads_folder:
logger.warning(
f"Attempting to get {filename}, but no uploads or downloads folder set."
)
raise HTTPException(404)
media_type, _ = mimetypes.guess_type(file_path)
return FileResponse(path=file_path, media_type=media_type, filename=filename)
return FileResponse(path=file_path, media_type=media_type, filename=safe_name)
@app.post("/files")
async def upload_file(file: UploadFile = File(...)):
"""Handle file uploads from clients. Requires --uploads-folder to be set."""
if not args.uploads_folder:
raise HTTPException(
503,
"File upload is disabled: start the runner with -u/--uploads-folder to set the uploads directory.",
)
folder = Path(args.uploads_folder)
folder.mkdir(parents=True, exist_ok=True)
# Always save as UUID so uploads are not discoverable by guessing filenames
safe_name = uuid.uuid4().hex
file_path = folder / safe_name
try:
contents = await file.read()
file_path.write_bytes(contents)
logger.debug(f"Uploaded file to {file_path}")
except OSError as e:
logger.error(f"Failed to save upload: {e}")
raise HTTPException(500, "Failed to save file") from e
_trim_uploads_folder(folder, args.uploads_folder_max_files)
# Use original filename only for format/mime in response
original_name = Path(file.filename or "").name
media_type, _ = mimetypes.guess_type(original_name, strict=False)
return {
"name": original_name,
"source": f"/files/{safe_name}",
"format": media_type,
}
# Initialize the SmallWebRTC request handler
small_webrtc_handler: SmallWebRTCRequestHandler = SmallWebRTCRequestHandler(
@@ -851,6 +922,11 @@ def runner_downloads_folder() -> Optional[str]:
return RUNNER_DOWNLOADS_FOLDER
def runner_uploads_folder() -> Optional[str]:
"""Returns the folder where client uploads are stored (short-lived)."""
return RUNNER_UPLOADS_FOLDER
def runner_host() -> str:
"""Returns the host name of this runner."""
return RUNNER_HOST
@@ -876,7 +952,9 @@ def main(parser: Optional[argparse.ArgumentParser] = None):
- -t/--transport: Transport type (daily, webrtc, twilio, telnyx, plivo, exotel)
- -x/--proxy: Public proxy hostname for telephony webhooks
- -d/--direct: Connect directly to Daily room (automatically sets transport to daily)
- -f/--folder: Path to downloads folder
- -f/--downloads-folder: Path to folder for files available for download
- -u/--uploads-folder: Path to folder for client uploads (short-lived)
- --uploads-folder-max-files: Max files in uploads folder (default: 10)
- --dialin: Enable Daily PSTN dial-in webhook handling (requires Daily transport)
- --esp32: Enable SDP munging for ESP32 compatibility (requires --host with IP address)
- --whatsapp: Ensure requried WhatsApp environment variables are present
@@ -889,7 +967,7 @@ def main(parser: Optional[argparse.ArgumentParser] = None):
ones. Custom args are accessible via `runner_args.cli_args`.
"""
global RUNNER_DOWNLOADS_FOLDER, RUNNER_HOST, RUNNER_PORT
global RUNNER_DOWNLOADS_FOLDER, RUNNER_UPLOADS_FOLDER, RUNNER_HOST, RUNNER_PORT
if not parser:
parser = argparse.ArgumentParser(description="Pipecat Development Runner")
@@ -911,7 +989,27 @@ def main(parser: Optional[argparse.ArgumentParser] = None):
default=False,
help="Connect directly to Daily room (automatically sets transport to daily)",
)
parser.add_argument("-f", "--folder", type=str, help="Path to downloads folder")
parser.add_argument(
"-f",
"--downloads-folder",
type=str,
dest="downloads_folder",
help="Path to folder for files available for download",
)
parser.add_argument(
"-u",
"--uploads-folder",
type=str,
dest="uploads_folder",
help="Path to folder for client uploads (short-lived; max files enforced)",
)
parser.add_argument(
"--uploads-folder-max-files",
type=int,
default=10,
dest="uploads_folder_max_files",
help="Max files to keep in uploads folder; oldest removed (default: 10)",
)
parser.add_argument(
"-v", "--verbose", action="count", default=0, help="Increase logging verbosity"
)
@@ -994,7 +1092,8 @@ def main(parser: Optional[argparse.ArgumentParser] = None):
print(f" → Open http://{args.host}:{args.port} in your browser to start a session")
print()
RUNNER_DOWNLOADS_FOLDER = args.folder
RUNNER_DOWNLOADS_FOLDER = args.downloads_folder
RUNNER_UPLOADS_FOLDER = args.uploads_folder
RUNNER_HOST = args.host
RUNNER_PORT = args.port

View File

@@ -691,7 +691,7 @@ class SmallWebRTCInputTransport(BaseInputTransport):
Args:
message: The application message to process.
"""
logger.debug(f"Received app message inside SmallWebRTCInputTransport {message}")
logger.trace(f"Received app message inside SmallWebRTCInputTransport {message}")
await self.broadcast_frame(InputTransportMessageFrame, message=message)
# Add this method similar to DailyInputTransport.request_participant_image