from version import VERSION # TODO put in config table THREADS_PER_PAGE = 50 from flask import Flask, render_template, session, request, redirect, url_for, flash, g from db.sqlite import DB import os, sys, subprocess import passlib.hash, secrets import time import string from datetime import datetime import captcha, password, minimd app = Flask(__name__) db = DB(os.getenv("DB")) # This defaults to None, which allows CSRF attacks in FireFox # and older versions of Chrome. # 'Lax' is sufficient to prevent malicious POST requests. app.config["SESSION_COOKIE_SAMESITE"] = "Lax" class Config: pass config = Config() ( config.version, config.server_name, config.server_description, app.config["SECRET_KEY"], config.captcha_key, config.registration_enabled, config.login_required ) = db.get_config() config.user_css = os.path.exists(os.path.join(app.static_folder, 'user.css')) if config.version != VERSION: print(f"Incompatible version {config.version} (expected {VERSION})") sys.exit(1) class Role: USER = 0 MODERATOR = 1 ADMIN = 2 @app.before_request def before_request(): if config.login_required: user_id = session.get("user_id", -1) if user_id == -1 and request.endpoint not in ("login","static"): return redirect(url_for("login")) @app.after_request def after_request(response): # This forbids other sites from embedding this site in an iframe, # preventing clickjacking attacks. response.headers["X-Frame-Options"] = "DENY" return response @app.route("/") def index(): return render_template( "index.html", title=config.server_name, description=config.server_description, config=config, user=get_user(), forums=db.get_forums(), ) @app.route("/forum//") def forum(forum_id): title, description = db.get_forum(forum_id) offset = int(request.args.get("p", 0)) user_id = session.get("user_id", -1) threads = [*db.get_threads(forum_id, offset, THREADS_PER_PAGE + 1, user_id)] if len(threads) == THREADS_PER_PAGE + 1: threads.pop() next_page = offset + THREADS_PER_PAGE else: next_page = None return render_template( "forum.html", title=title, user=get_user(), config=config, forum_id=forum_id, description=description, threads=threads, next_page=next_page, prev_page=max(offset - THREADS_PER_PAGE, 0) if offset > 0 else None, ) @app.route("/thread//") def thread(thread_id): user = get_user() ( title, text, author, author_id, create_time, modify_time, comments, hidden, forum_id ) = db.get_thread(thread_id) forum_title, _ = db.get_forum(forum_id) comments = create_comment_tree(comments, user) return render_template( "thread.html", title=title, config=config, user=user, text=text, author=author, author_id=author_id, thread_id=thread_id, forum_id=forum_id, forum_title=forum_title, hidden=hidden, create_time=create_time, modify_time=modify_time, comments=comments, ) @app.route("/comment//") def comment(comment_id): user = get_user() thread_id, parent_id, title, comments = db.get_subcomments(comment_id) forum_id = db.get_thread_forum(thread_id) forum_title, _ = db.get_forum(forum_id) comments = create_comment_tree(comments, user) (reply_comment,) = comments comments = reply_comment.children reply_comment.children = [] return render_template( "comments.html", title=title, config=config, user=user, reply_comment=reply_comment, comments=comments, parent_id=parent_id, thread_id=thread_id, forum_id=forum_id, forum_title=forum_title ) @app.route("/login/", methods=["GET", "POST"]) def login(): if request.method == "POST": v = db.get_user_password(request.form["username"]) if v is not None: id, hash = v time.sleep(0.3) if password.verify(request.form["password"], hash): flash("Logged in", "success") session["user_id"] = id session.permanent = True return redirect(url_for("index")) else: # Sleep to reduce effectiveness of bruteforce time.sleep(0.5) flash("Username or password is invalid", "error") return render_template("login.html", title="Login", config=config, user=get_user()) @app.route("/logout/") def logout(): session.pop("user_id") return redirect(url_for("index")) @app.route("/user/", methods=["GET", "POST"]) def user_edit(): user = get_user() if user is None: return redirect(url_for("login")) if request.method == "POST": about = trim_text(request.form["about"]) db.set_user_private_info(user.id, about) flash("Updated profile", "success") else: (about,) = db.get_user_private_info(user.id) return render_template( "user_edit.html", title="Edit profile", config=config, user=user, about=about ) @app.route("/user/edit/password/", methods=["POST"]) def user_edit_password(): user_id = session.get("user_id") if user_id is None: return redirect(url_for("login")) new = request.form["new"] if len(new) < 8: flash("New password must be at least 8 characters long", "error") else: (hash,) = db.get_user_password_by_id(user_id) if password.verify(request.form["old"], hash): if db.set_user_password(user_id, password.hash(new)): flash("Updated password", "success") else: flash("Failed to update password", "error") else: flash("Old password does not match", "error") return redirect(url_for("user_edit")) @app.route("/user//") def user_info(user_id): name, about, banned_until = db.get_user_public_info(user_id) return render_template( "user_info.html", title="Profile", config=config, user=get_user(), name=name, id=user_id, banned_until=banned_until, about=about, ) @app.route("/forum//new/", methods=["GET", "POST"]) def new_thread(forum_id): user_id = session.get("user_id") if user_id is None and not config.registration_enabled: # Can't create a thread without an account return redirect(url_for("login")) if request.method == "POST": if user_id is None: # Attempt to create a user account first if register_user(True): user_id = session["user_id"] if user_id is not None: title, text = request.form["title"].strip(), trim_text(request.form["text"]) title = title.strip() if title == "" or text == "": flash("Title and text may not be empty", "error") return redirect(url_for("forum", forum_id=forum_id)) id = db.add_thread(user_id, forum_id, title, text, time.time_ns()) if id is None: flash("Failed to create thread", "error") return redirect(url_for("forum", forum_id=forum_id)) else: (id,) = id flash("Created thread", "success") return redirect(url_for("thread", thread_id=id)) return render_template( "new_thread.html", title="Create new thread", config=config, user=get_user(), ) @app.route("/thread//confirm_delete/") def confirm_delete_thread(thread_id): (title,) = db.get_thread_title(thread_id) return render_template( "confirm_delete_thread.html", title="Delete thread", config=config, user=get_user(), thread_title=title, ) @app.route("/thread//delete/", methods=["POST"]) def delete_thread(thread_id): user_id = session.get("user_id") if user_id is None: return redirect(url_for("login")) if db.delete_thread(user_id, thread_id): flash("Thread has been deleted", "success") else: flash("Thread could not be removed", "error") # TODO return 403, maybe? return redirect(url_for("index")) def _add_comment_check_user(): user_id = session.get("user_id") if user_id is not None: return user_id if not config.registration_enabled: flash("Registrations are not enabled. Please log in to comment", "error") if register_user(True): return session["user_id"] @app.route("/thread//comment/", methods=["POST"]) def add_comment(thread_id): user_id = _add_comment_check_user() if user_id is not None: text = trim_text(request.form["text"]) if text == "": flash("Text may not be empty", "error") elif db.add_comment_to_thread(thread_id, user_id, text, time.time_ns()): flash("Added comment", "success") else: flash("Failed to add comment", "error") return redirect(url_for("thread", thread_id=thread_id)) @app.route("/comment//comment/", methods=["POST"]) def add_comment_parent(comment_id): user_id = _add_comment_check_user() if user_id is not None: text = trim_text(request.form["text"]) if text == "": flash("Text may not be empty", "error") elif db.add_comment_to_comment(comment_id, user_id, text, time.time_ns()): flash("Added comment", "success") else: flash("Failed to add comment", "error") return redirect(url_for("comment", comment_id=comment_id)) @app.route("/comment//confirm_delete/") def confirm_delete_comment(comment_id): title, text = db.get_comment(comment_id) return render_template( "confirm_delete_comment.html", title="Delete comment", config=config, user=get_user(), thread_title=title, text=text, ) @app.route("/comment//delete/", methods=["POST"]) def delete_comment(comment_id): user_id = session.get("user_id") if user_id is None: return redirect(url_for("login")) if db.delete_comment(user_id, comment_id): flash("Comment has been deleted", "success") else: flash("Comment could not be removed", "error") # TODO return 403, maybe? return redirect(url_for("index")) @app.route("/thread//edit/", methods=["GET", "POST"]) def edit_thread(thread_id): user_id = session.get("user_id") if user_id is None: return redirect(url_for("login")) if request.method == "POST": title, text = request.form["title"].strip(), trim_text(request.form["text"]) if title == "" or text == "": flash("Title and text may not be empty", "error") elif db.modify_thread( thread_id, user_id, title, text, time.time_ns(), ): flash("Thread has been edited", "success") else: flash("Thread could not be edited", "error") return redirect(url_for("thread", thread_id=thread_id)) title, text = db.get_thread_title_text(thread_id) return render_template( "edit_thread.html", title="Edit thread", config=config, user=get_user(), thread_title=title, text=text, ) @app.route("/comment//edit/", methods=["GET", "POST"]) def edit_comment(comment_id): user_id = session.get("user_id") if user_id is None: return redirect(url_for("login")) if request.method == "POST": text = trim_text(request.form["text"]) if text == "": flash("Text may not be empty", "error") elif db.modify_comment( comment_id, user_id, trim_text(request.form["text"]), time.time_ns(), ): flash("Comment has been edited", "success") else: flash("Comment could not be edited", "error") return redirect(url_for("comment", comment_id=comment_id)) title, text = db.get_comment(comment_id) return render_template( "edit_comment.html", title="Edit comment", config=config, user=get_user(), thread_title=title, text=text, ) @app.route("/register/", methods=["GET", "POST"]) def register(): if request.method == "POST": username, passwd = request.form["username"], request.form["password"] if register_user(False): return redirect(url_for("index")) capt, answer = captcha.generate(config.captcha_key) return render_template( "register.html", title="Register", config=config, user=get_user(), captcha=capt, answer=answer, ) @app.route("/admin/") def admin(): chk, user = _admin_check() if not chk: return user return render_template( "admin/index.html", title="Admin panel", config=config, forums=db.get_forums(), users=db.get_users(), ) @app.route("/admin/query/", methods=["GET", "POST"]) def admin_query(): chk, user = _admin_check() if not chk: return user try: rows, rowcount = db.query(request.form["q"]) if request.method == "POST" else [] if rowcount > 0: flash(f"{rowcount} rows changed", "success") except Exception as e: flash(e, "error") rows = [] return render_template( "admin/query.html", title="Query", config=config, rows=rows, ) @app.route("/admin/forum//edit//", methods=["POST"]) def admin_edit_forum(forum_id, what): chk, user = _admin_check() if not chk: return user try: if what == "description": res = db.set_forum_description( forum_id, trim_text(request.form["description"]) ) elif what == "name": res = db.set_forum_name(forum_id, request.form["name"]) else: flash(f'Unknown property "{what}"', "error") res = None if res is True: flash(f"Updated {what}", "success") elif res is False: flash(f"Failed to update {what}", "error") except Exception as e: flash(e, "error") return redirect(url_for("admin")) @app.route("/admin/forum/new/", methods=["POST"]) def admin_new_forum(): chk, user = _admin_check() if not chk: return user try: db.add_forum(request.form["name"], trim_text(request.form["description"])) flash("Added forum", "success") except Exception as e: flash(str(e), "error") return redirect(url_for("admin")) @app.route("/admin/config/edit/", methods=["POST"]) def admin_edit_config(): chk, user = _admin_check() if not chk: return user try: db.set_config( request.form["server_name"], trim_text(request.form["server_description"]), "registration_enabled" in request.form, "login_required" in request.form, ) flash("Updated config. Refresh the page to see the changes.", "success") restart() except Exception as e: flash(str(e), "error") return redirect(url_for("admin")) @app.route("/admin/config/new_secrets/", methods=["POST"]) def admin_new_secrets(): chk, user = _admin_check() if not chk: return user secret_key = secrets.token_urlsafe(30) captcha_key = secrets.token_urlsafe(30) try: db.set_config_secrets(secret_key, captcha_key) flash("Changed secrets. You will be logged out.", "success") restart() except Exception as e: flash(str(e), "error") return redirect(url_for("admin")) def ban_user(user_id): chk, user = _moderator_check() if not chk: return user d, t = request.form["days"], request.form["time"] d = 0 if d == "" else int(d) h, m = (0, 0) if t == "" else map(int, t.split(":")) until = time.time_ns() + (d * 24 * 60 + h * 60 + m) * (60 * 10**9) until = min(until, 0x7FFF_FFFF_FFFF_FFFF) try: if db.set_user_ban(user_id, until): flash("Banned user", "success") else: flash("Failed to ban user", "error") except Exception as e: flash(str(e), "error") @app.route("/user//ban/", methods=["POST"]) def moderator_ban_user(user_id): return ban_user(user_id) or redirect(url_for("user_info", user_id=user_id)) @app.route("/admin/user//ban/", methods=["POST"]) def admin_ban_user(user_id): return ban_user(user_id) or redirect(url_for("admin")) def unban_user(user_id): chk, user = _moderator_check() if not chk: return user try: if db.set_user_ban(user_id, 0): flash("Unbanned user", "success") else: flash("Failed to unban user", "error") except Exception as e: flash(str(e), "error") @app.route("/user//unban/", methods=["POST"]) def moderator_unban_user(user_id): return unban_user(user_id) or redirect(url_for("user_info", user_id=user_id)) @app.route("/admin/user//unban/", methods=["POST"]) def admin_unban_user(user_id): return unban_user(user_id) or redirect(url_for("admin")) @app.route("/admin/user/new/", methods=["POST"]) def admin_new_user(): chk, user = _admin_check() if not chk: return user try: name, passwd = request.form["name"], request.form["password"] if name == "" or passwd == "": flash("Name and password may not be empty") elif db.add_user(name, password.hash(passwd), time.time_ns()): flash("Added user", "success") else: flash("Failed to add user", "error") except Exception as e: flash(str(e), "error") return redirect(url_for("admin")) @app.route("/admin/user//edit/role/", methods=["POST"]) def admin_set_role(user_id): chk, user = _admin_check() if not chk: return user try: role = request.form["role"] if role not in ("0", "1", "2"): flash(f"Invalid role type ({role})", "error") else: db.set_user_role(user_id, role) flash("Set user role", "success") except Exception as e: flash(str(e), "error") return redirect(url_for("admin")) @app.route("/admin/restart/", methods=["POST"]) def admin_restart(): chk, user = _admin_check() if not chk: return user restart() return redirect(url_for("admin")) @app.route("/thread//hide/", methods=["POST"]) def set_hide_thread(thread_id): chk, user = _moderator_check() if not chk: return user try: hide = request.form["hide"] != "0" hide_str = "Hidden" if hide else "Unhidden" if db.set_thread_hidden(thread_id, hide): flash(f"{hide_str} thread", "success") else: flash(f"Failed to {hide_str.lower()} thread", "error") except Exception as e: flash(str(e), "error") return redirect(request.form["redirect"]) @app.route("/comment//hide/", methods=["POST"]) def set_hide_comment(comment_id): chk, user = _moderator_check() if not chk: return user try: hide = request.form["hide"] != "0" hide_str = "Hidden" if hide else "Unhidden" if db.set_comment_hidden(comment_id, hide): flash(f"{hide_str} comment", "success") else: flash(f"Failed to {hide_str.lower()} comment", "error") except Exception as e: flash(str(e), "error") return redirect(request.form["redirect"]) # TODO can probably be a static-esque page, maybe? @app.route("/help/") def help(): return render_template( "help.html", title="Help", user=get_user(), ) def _moderator_check(): user = get_user() if user is None: return False, redirect(url_for("login")) if not user.is_moderator(): return False, ("

Forbidden

", 403) return True, user def _admin_check(): user = get_user() if user is None: return False, redirect(url_for("login")) if not user.is_admin(): return False, ("

Forbidden

", 403) return True, user class Comment: def __init__( self, id, parent_id, author_id, author, text, create_time, modify_time, hidden ): self.id = id self.author_id = author_id self.author = author self.text = text self.children = [] self.create_time = create_time self.modify_time = modify_time self.parent_id = parent_id self.hidden = hidden def create_comment_tree(comments, user): start = time.time() # Collect comments first, then build the tree in case we encounter a child before a parent comment_map = {v[0]: Comment(*v) for v in comments} root = [] # We should keep showing hidden comments if the user replied to them, directly or indirectly. # To do that, keep track of user comments, then walk up the tree and insert hidden comments. user_comments = [] # Build tree def insert(comment): parent = comment_map.get(comment.parent_id) if parent is not None: parent.children.append(comment) else: root.append(comment) for comment in comment_map.values(): if comment.hidden and (not user or not user.is_moderator()): continue insert(comment) if user and (comment.author_id == user.id and not user.is_moderator()): user_comments.append(comment) # Insert replied-to hidden comments for c in user_comments: while c is not None: if c.hidden: insert(c) c = comment_map.get(c.parent_id) # Sort each comment based on create time def sort_time(l): l.sort(key=lambda c: c.modify_time, reverse=True) for c in l: sort_time(c.children) sort_time(root) return root class User: def __init__(self, id, name, role, banned_until): self.id = id self.name = name self.role = role self.banned_until = banned_until def is_moderator(self): return self.role in (Role.ADMIN, Role.MODERATOR) def is_admin(self): return self.role == Role.ADMIN def is_banned(self): return self.banned_until > time.time_ns() def get_user(): id = session.get("user_id") if id is not None: name, role, banned_until = db.get_user_name_role_banned(id) return User(id, name, role, banned_until) return None def register_user(show_password): username, passwd = request.form["username"], request.form["password"] if any(c in username for c in string.whitespace): # This error is more ergonomic in case someone tries to play tricks again :) flash("Username may not contain whitespace", "error") elif len(username) < 3: flash("Username must be at least 3 characters long", "error") elif len(passwd) < 8: flash("Password must be at least 8 characters long", "error") elif not captcha.verify( config.captcha_key, request.form["captcha"], request.form["answer"], ): flash("CAPTCHA answer is incorrect", "error") else: uid = db.register_user(username, password.hash(passwd), time.time_ns()) if uid is None: flash("Failed to create account (username may already be taken)", "error") else: s = "Account has been created." if show_password: s += f" Your password is {passwd} (hover to reveal)." flash(s, "success") (uid,) = uid session["user_id"] = uid session.permanent = True return True return False @app.context_processor def utility_processor(): def _format_time_delta(n, t): # Try the sane thing first dt = (n - t) // 10**9 if dt < 1: return "less than a second" if dt < 2: return f"1 second" if dt < 60: return f"{dt} seconds" if dt < 119: return f"1 minute" if dt < 3600: return f"{dt // 60} minutes" if dt < 3600 * 2: return f"1 hour" if dt < 3600 * 24: return f"{dt // 3600} hours" if dt < 3600 * 24 * 31: return f"{dt // (3600 * 24)} days" # Try some very rough estimate, whatever f = lambda x: datetime.utcfromtimestamp(x // 10**9) n, t = f(n), f(t) def f(x, y, s): return f'{y - x} {s}{"s" if y - x > 1 else ""}' if t.year < n.year: return f(t.year, n.year, "year") if t.month < n.month: return f(t.month, n.month, "month") assert False, "unreachable" def format_since(t): n = time.time_ns() if n < t: return "in a distant future" return _format_time_delta(n, t) + " ago" def format_until(t): n = time.time_ns() if t <= n: return "in a distant past" return _format_time_delta(t, n) def format_time(t): return datetime.utcfromtimestamp(t / 10**9).replace(microsecond=0) def rand_password(): """ Generate a random password. The current implementation returns 12 random lower- and uppercase alphabet characters. This gives up to `log((26 * 2) ** 12) / log(2) = ~68` bits of entropy, which should be enough for the foreseeable future. """ return "".join(string.ascii_letters[secrets.randbelow(52)] for _ in range(12)) def gen_captcha(): return captcha.generate(config.captcha_key) return { "format_since": format_since, "format_time": format_time, "format_until": format_until, "minimd": minimd.html, "rand_password": rand_password, "gen_captcha": gen_captcha, } def restart(): """ Shut down *all* workers and spawn new ones. This is necessary on e.g. a configuration change. Since restarting workers depends is platform-dependent this task is delegated to an external program. """ r = subprocess.call(["./restart.sh"]) if r == 0: flash("Restart script exited successfully", "success") else: flash(f"Restart script exited with error (code {r})", "error") def trim_text(s): """ Because browsers LOVE \\r, trailing whitespace etc. """ return s.replace("\r", "")