Files
mini-flees/code/app.py
2024-01-14 10:34:46 +02:00

359 lines
9.9 KiB
Python

# -*- coding: utf-8 -*-
import logging
import os
import sys
import time
from flask import (
Flask,
jsonify,
redirect,
render_template,
request,
send_from_directory,
session,
url_for,
)
from revprox import ReverseProxied
from utils.files import (
db_add_download,
db_delete_file,
db_get_file,
db_maintenance,
db_store_file,
file_details,
file_full_path,
file_full_url,
file_list,
file_list_simple,
invalidate_upload_token,
new_upload_token,
validate_upload_token,
)
from utils.misc import (
file_date_human,
hash_password,
is_ip_allowed,
random_token,
verify_password,
)
from werkzeug.utils import secure_filename
logging.basicConfig(
level=logging.INFO,
format=f"[%(asctime)s] [%(levelname)s] %(message)s",
)
__VERSION__ = "20240114.0"
app = Flask(__name__)
app.config.from_object(__name__)
app.config.from_prefixed_env()
app.debug = True
app.secret_key = app.config["APP_SECRET_KEY"]
app.wsgi_app = ReverseProxied(app.wsgi_app)
@app.before_request
def log_request_info():
pass
@app.after_request
def log_the_status_code(response):
ip = request.environ.get("HTTP_X_FORWARDED_FOR", request.remote_addr)
app.logger.info(f"{request.method}:{response.status_code} {request.url} {ip}")
return response
@app.route("/")
def index():
"""Returns Nothing"""
return "", 200
@app.route('/health.html', methods=["GET",]) # fmt: skip
def health():
return f"OK {request.url}", 200
@app.route("/upload", methods=["PUT", "POST"])
def upload():
"""
Upload a file, example CURL:
cat file | \
curl -fL -w "\n" -F file="@-" -X POST \
-H "Name: my.file.ext" \
-H "Max-Downloads: 4000" \
-H "Expires-Days: 14" \
-H "Password: mypass" \
-H "Hidden: true" \
-H "Allowed-IP: 10.0.0.1,10.0.0.2,10.1.*" \
-H "Secret: dff789f0bbe8183d32542" \
"$FLASK_PUBLIC_URL"/upload
- Additionally, "Expires-Hours" can be used.
- Max-Dowloads: -1 means no upper limit
IF using GET, you can upload larger files with pipes
cat largefile | \
curl -fL -w "\n" --upload-file - \
-H "Name: my.file.ext" \
-H "Max-Downloads: 4000" \
-H "Expires-Days: 14" \
-H "Secret: dff789f0bbe8183d32542" \
"$FLASK_PUBLIC_URL"/upload
Returns the file download URL
"""
name = request.headers.get("Name", None)
if name is None:
return "Name required", 500
safe_filename = secure_filename(name)
secret = request.headers.get("Secret", "")
upload_token = request.headers.get("Token", False)
if upload_token:
if not validate_upload_token(upload_token):
return "Error", 401
else:
if secret != app.config["ACCESS_TOKEN"]:
return "Error", 401
max_dl = request.headers.get("Max-Downloads", app.config["DEFAULT_MAX_DL"])
expires = int(time.time()) + int(app.config["DEFAULT_EXPIRE"])
if "Expires-days" in request.headers:
expires = int(time.time()) + 24 * 3600 * int(request.headers.get("Expires-days"))
if "Expires-hours" in request.headers:
expires = int(time.time()) + 3600 * int(request.headers.get("Expires-hours"))
password = None
if "Password" in request.headers:
if request.headers["Password"] != "":
password = hash_password(request.headers["Password"])
hidden = None
if "Hidden" in request.headers:
hidden = request.headers["Hidden"].lower() == "true"
allowed_ip = None
if "Allowed-IP" in request.headers:
allowed_ip = request.headers["Allowed-IP"].lower()
if not set(allowed_ip) <= set("1234567890.*, "):
return "IP list contains unknown characters"
while True:
token = random_token()
folder = os.path.join(app.config["DATAFOLDER"], token)
if not os.path.exists(folder):
break
filename = file_full_path(token, safe_filename)
os.mkdir(folder)
if request.method == "POST":
file = request.files.get("file")
if file:
file.save(filename)
else:
return "Use the 'file' variable to upload\n", 400
if request.method == "PUT":
chunk_size = 1024 * 1024 * 64 # 64Mb
with open(filename, "wb") as f:
while True:
chunk = request.stream.read(chunk_size)
if not chunk:
break
f.write(chunk)
db_store_file(token, safe_filename, expires, max_dl, password, ips=allowed_ip, hidden=hidden)
download_url = file_full_url(token, safe_filename)
app.logger.info(f"Upload: {download_url} MaxDL:{max_dl} Exp:{file_date_human(expires)}")
if upload_token:
invalidate_upload_token(upload_token)
return "File uploaded\n%s\n" % (download_url,), 200
@app.route("/new_token", methods=["GET"])
def upload_token():
"""
Get JSON of file details. Size, added date, download times, etc.
curl -fL -w "\n" \
-H "Expires-Days: 14" \
-H "Secret: dff789f0bbe8183d32542" \
"$FLASK_PUBLIC_URL"/new_token
"""
secret = request.headers.get("Secret", "")
if secret != app.config["ACCESS_TOKEN"]:
return "Error", 401
expires = int(time.time()) + int(app.config["DEFAULT_EXPIRE"])
if "Expires-days" in request.headers:
expires = int(time.time()) + 24 * 3600 * int(request.headers.get("Expires-days"))
token = new_upload_token(expires)
return token, 200
@app.route("/details/<token>/<name>", methods=["GET"])
def details(token, name):
"""
Get JSON of file details. Size, added date, download times, etc.
curl -fL -w "\n" \
-H "Secret: dff789f0bbe8183d3254258b33a147d580c1131f39a698c56d3f640ac8415714" \
"$ROOTURL"/details/OdD7X0aKOGM/big_file1.ext
"""
secret = request.headers.get("Secret", "")
if secret != app.config["ACCESS_TOKEN"]:
return "Error", 401
details = file_details(token, name)
if details["allowed_ip"] is not None:
if not is_ip_allowed(
details["allowed_ip"], request.environ.get("HTTP_X_FORWARDED_FOR", request.remote_addr), app
):
return "Not Allowed", 403
return jsonify(details), 200
@app.route("/delete/<token>/<name>", methods=["GET"])
def delete_file(name, token):
"""
Delete a file from the system
"""
secret = request.headers.get("Secret", "")
if secret != app.config["ACCESS_TOKEN"]:
return "Error", 401
try:
os.remove(os.path.join(app.config["DATAFOLDER"], token, name))
except Exception:
pass
db_delete_file(token, name)
return "OK", 200
@app.route("/ls", methods=["GET"])
def ls():
"""
Lists all uploaded files
"""
secret = request.headers.get("Secret", "")
if secret != app.config["ACCESS_TOKEN"]:
return "Error", 401
return "\n".join(file_list()), 200
@app.route("/ls-simple", methods=["GET"])
def ls_simple():
"""
Lists token/names
"""
secret = request.headers.get("Secret", "")
if secret != app.config["ACCESS_TOKEN"]:
return "Error", 401
return "\n".join(file_list_simple()), 200
@app.route("/maintenance", methods=["GET"])
def maintenance():
"""
Clears DB of expired entries.
Deletes folders without DB entry
"""
secret = request.headers.get("Secret", "")
if secret != app.config["ACCESS_TOKEN"]:
return "Error", 401
return db_maintenance(), 200
@app.route("/d/<token>/<name>", methods=["GET"])
def download(name, token):
"""
Download a file
"""
if "Password" in request.headers:
session[token] = request.headers["Password"]
return download_file(token, name)
@app.route("/script/mfl", methods=["GET"])
def script_mfl():
secret = request.headers.get("Secret", "")
if secret != app.config["ACCESS_TOKEN"]:
return "Error", 401
return render_template(
"mfl",
token=app.config["ACCESS_TOKEN"],
rooturl=app.config["PUBLIC_URL"],
version=__VERSION__,
)
@app.route("/login", methods=["GET", "POST"])
def login():
if request.method == "POST":
session[request.form["token"]] = request.form["password"]
return redirect(request.form["redirect"])
if set(("name", "redirect", "token")).issubset(session.keys()):
return render_template(
"login.html",
filename=session["name"],
redirect=session["redirect"],
token=session["token"],
)
return "", 400
@app.route("/logout", methods=["GET"])
def logout():
session.clear()
return "OK", 200
def download_file(token, name):
"""
check for file expiry, and send file if allowed
"""
full_path = file_full_path(token, name)
if not os.path.exists(full_path):
return "Error", 404
db_stat = db_get_file(token, name)
if db_stat:
added, expires, downloads, max_dl, password_hash, hidden, allowed_ip = db_stat
else:
return "Error", 404
if allowed_ip is not None:
if not is_ip_allowed(allowed_ip, request.environ.get("HTTP_X_FORWARDED_FOR", request.remote_addr), app):
return "Not Allowed", 403
# check ip list!
if downloads >= max_dl and max_dl > -1:
return "Expired", 403
if expires < time.time():
return "Expired", 403
if password_hash:
if verify_password(session.get(token, ""), password_hash):
pass
else:
session["token"] = token
session["name"] = name
session["redirect"] = url_for("download", name=name, token=token)
return redirect(url_for("login"))
db_add_download(token, name)
return send_from_directory(directory=os.path.join(app.config["DATAFOLDER"], token), path=name)
def print_debug(s):
if app.config["DEBUG"]:
sys.stderr.write(str(s) + "\n")
sys.stderr.flush()
if __name__ == "__main__":
app.run(debug=True)