Files
pwss/docker-pwss/code/utils.py
2022-08-18 15:42:27 +03:00

132 lines
3.7 KiB
Python

from flask import request, current_app as app, g, session
from datetime import datetime
import bcrypt
import os
import json
import secrets
import sqlite3
import time
import sys
CONFIGS = os.getenv("CONFIG_FOLDER")
def check_password(config, pw):
pw = pw.encode("utf-8")
return bcrypt.checkpw(pw, config["password"].encode("utf-8"))
def check_auth(path):
folder = path.split(os.sep)[0]
if not f"auth/{folder}" in session:
return False
return has_session(folder)
def authenticate(config, password):
"""Return success of authentication"""
if not "name" in config:
return False
folder = config["name"]
if config["expires"] == "never":
expiration = time.time() + app.config["SESSION_EXPIRY"]
else:
expiration = datetime.fromisoformat(config["expires"]).timestamp()
if f"auth/{folder}" in session:
session.pop(f"auth/{folder}")
if expiration > time.time():
if "password" in config and check_password(config, password):
set_session(folder, max_expiration=expiration)
return True
return False
def get_db():
db = getattr(g, "_database", None)
if db is None:
db = g._database = sqlite3.connect(app.config["DATABASE"])
return db
def get_valid_sessions():
query = "SELECT folder, expire, token FROM sessions WHERE expire > ? AND ip = ?"
args = (int(time.time()), get_ip())
# TODO, validate that sessions have the token correct too.
tokens = [session[key] for key in session if key.startswith("auth/")]
try:
cur = get_db().execute(query, args)
valid_sessions = [
(
row[0],
int((row[1] - time.time()) / 60),
read_config(row[0]).get("days_left", "end of"),
)
for row in cur.fetchall()
if row[2] in tokens
]
cur.close()
return valid_sessions
except Exception as e:
print(e, file=sys.stderr)
return []
def has_session(folder):
query = "SELECT count(token) FROM sessions WHERE folder = ? AND expire > ? AND token = ? AND ip = ?"
args = (folder, int(time.time()), session[f"auth/{folder}"], get_ip())
try:
cur = get_db().execute(query, args)
is_valid = cur.fetchall()[0][0] > 0
cur.close()
except Exception as e:
print(e, file=sys.stderr)
return False
return is_valid
def set_session(folder, max_expiration=None):
token = secrets.token_hex(16)
query = "INSERT INTO sessions (folder, expire, token, ip) VALUES (?,?,?,?)"
expiry_time = int(app.config["SESSION_EXPIRY"] + time.time())
if max_expiration:
expiry_time = int(min(max_expiration, expiry_time))
args = (folder, expiry_time, token, get_ip())
db = get_db()
cur = db.execute(query, args)
cur.close()
db.commit()
session[f"auth/{folder}"] = token
def get_ip():
ip = request.environ.get(
"HTTP_X_FORWARDED_FOR", request.remote_addr or "127.0.0.1"
)
ip = ip.split(",")[0].strip()
return ip
def read_config(path):
try:
rootdir = path.split(os.sep)[0]
config_file = os.path.join(CONFIGS, f"{rootdir}.json")
with open(config_file, "rt") as fp:
config = json.load(fp)
config["name"] = rootdir
try:
config["days_left"] = round(
(
datetime.fromisoformat(config["expires"]).timestamp()
- time.time()
)
/ 86400,
1,
)
except ValueError:
config["days_left"] = "end of"
return config
except (FileNotFoundError, AttributeError):
return {}