fix: validate download path containment in runner

Resolve and contain the user-supplied filename before serving it from
the runner's /files endpoint. Also raise a 404 (instead of returning
None) when the downloads folder is unset, and use the resolved
basename for Content-Disposition.
This commit is contained in:
Mark Backman
2026-05-04 16:20:27 -04:00
parent b363b91d12
commit e780f759d0
2 changed files with 111 additions and 5 deletions

View File

@@ -209,6 +209,17 @@ def _configure_server_app(args: argparse.Namespace):
logger.warning(f"Unknown transport type: {args.transport}")
def _resolve_download_path(folder: str, filename: str) -> Path:
"""Resolve a download path and ensure it stays within the downloads folder."""
allowed_base = Path(folder).resolve()
file_path = (allowed_base / filename).resolve()
if not file_path.is_relative_to(allowed_base):
raise HTTPException(status_code=403, detail="Access denied")
return file_path
def _setup_webrtc_routes(app: FastAPI, args: argparse.Namespace):
"""Set up WebRTC-specific routes."""
try:
@@ -250,16 +261,16 @@ def _setup_webrtc_routes(app: FastAPI, args: argparse.Namespace):
async def download_file(filename: str):
"""Handle file downloads."""
if not args.folder:
logger.warning(f"Attempting to dowload {filename}, but downloads folder not setup.")
return
logger.warning(f"Attempting to download {filename}, but downloads folder not setup.")
raise HTTPException(404)
file_path = Path(args.folder) / filename
if not os.path.exists(file_path):
file_path = _resolve_download_path(args.folder, filename)
if not file_path.exists():
raise HTTPException(404)
media_type, _ = mimetypes.guess_type(file_path)
return FileResponse(path=file_path, media_type=media_type, filename=filename)
return FileResponse(path=file_path, media_type=media_type, filename=file_path.name)
# Initialize the SmallWebRTC request handler
small_webrtc_handler: SmallWebRTCRequestHandler = SmallWebRTCRequestHandler(

View File

@@ -0,0 +1,95 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
import tempfile
import unittest
from pathlib import Path
from types import SimpleNamespace
from fastapi import FastAPI, HTTPException
from fastapi.testclient import TestClient
from pipecat.runner.run import _resolve_download_path, _setup_webrtc_routes
class TestRunnerDownloads(unittest.TestCase):
def test_resolve_download_path_allows_files_inside_folder(self):
with tempfile.TemporaryDirectory() as tmpdir:
downloads = Path(tmpdir) / "downloads"
nested = downloads / "nested"
nested.mkdir(parents=True)
file_path = nested / "recording.txt"
file_path.write_text("session transcript")
resolved = _resolve_download_path(str(downloads), "nested/recording.txt")
self.assertEqual(resolved, file_path.resolve())
def test_resolve_download_path_blocks_parent_traversal(self):
with tempfile.TemporaryDirectory() as tmpdir:
root = Path(tmpdir)
downloads = root / "downloads"
downloads.mkdir()
(root / "secret.txt").write_text("secret")
with self.assertRaises(HTTPException) as context:
_resolve_download_path(str(downloads), "../secret.txt")
self.assertEqual(context.exception.status_code, 403)
def test_download_file_returns_404_when_folder_not_configured(self):
app = FastAPI()
args = SimpleNamespace(folder=None, esp32=False, host="127.0.0.1")
_setup_webrtc_routes(app, args)
response = TestClient(app).get("/files/recording.txt")
self.assertEqual(response.status_code, 404)
def test_resolve_download_path_blocks_decoded_encoded_slashes(self):
with tempfile.TemporaryDirectory() as tmpdir:
root = Path(tmpdir)
downloads = root / "media"
downloads.mkdir()
outside = root / "outside"
outside.mkdir()
(outside / "secret.txt").write_text("secret")
with self.assertRaises(HTTPException) as context:
_resolve_download_path(str(downloads), "../outside/secret.txt")
self.assertEqual(context.exception.status_code, 403)
def test_resolve_download_path_blocks_absolute_paths(self):
with tempfile.TemporaryDirectory() as tmpdir:
downloads = Path(tmpdir) / "downloads"
downloads.mkdir()
with self.assertRaises(HTTPException) as context:
_resolve_download_path(str(downloads), "/etc/passwd")
self.assertEqual(context.exception.status_code, 403)
@unittest.skipUnless(hasattr(os, "symlink"), "os.symlink is not available")
def test_resolve_download_path_blocks_symlink_escape(self):
with tempfile.TemporaryDirectory() as tmpdir:
root = Path(tmpdir)
downloads = root / "downloads"
outside = root / "outside"
downloads.mkdir()
outside.mkdir()
(outside / "secret.txt").write_text("secret")
try:
os.symlink(outside, downloads / "linked")
except OSError as e:
self.skipTest(f"Unable to create symlink: {e}")
with self.assertRaises(HTTPException) as context:
_resolve_download_path(str(downloads), "linked/secret.txt")
self.assertEqual(context.exception.status_code, 403)