From 9437e64936af87ff84ed0a4b5f7273e596bcf4a2 Mon Sep 17 00:00:00 2001 From: Ville Rantanen Date: Sun, 23 Jul 2023 20:23:48 +0300 Subject: [PATCH] use markdown2, add forced login. Added breadcrumbs --- captcha.py | 13 +- db/sqlite.py | 427 +++++++++++++------- init_sqlite.sh | 6 +- main.py | 772 +++++++++++++++++++++---------------- minimd.py | 50 ++- password.py | 4 +- requirements.txt | 1 + schema.txt | 3 +- templates/admin/index.html | 4 + templates/comments.html | 2 +- templates/forum.html | 2 +- templates/thread.html | 1 + tool.py | 13 +- 13 files changed, 771 insertions(+), 527 deletions(-) diff --git a/captcha.py b/captcha.py index eea5287..0736c82 100644 --- a/captcha.py +++ b/captcha.py @@ -1,20 +1,25 @@ from random import randint import hashlib, base64 + # FIXME hash can be reused def generate(key): - ''' + """ Generate a simple CAPTCHA. It is based on a simple math expression which stops the simplest of bots. - ''' + """ # The parameters are chosen such that they are simple to solve on paper. a = randint(1, 10) b = randint(1, 10) c = randint(10, 20) - return f'{a} * {b} + {c} = ', _hash_answer(key, str(a * b + c)) + return f"{a} * {b} + {c} = ", _hash_answer(key, str(a * b + c)) + def verify(key, answer, hash): return _hash_answer(key, answer) == hash + def _hash_answer(key, answer): - return base64.b64encode(hashlib.sha256((key + answer).encode('utf-8')).digest()).decode('ascii') + return base64.b64encode( + hashlib.sha256((key + answer).encode("utf-8")).digest() + ).decode("ascii") diff --git a/db/sqlite.py b/db/sqlite.py index c13fe36..9115374 100644 --- a/db/sqlite.py +++ b/db/sqlite.py @@ -1,18 +1,25 @@ import sqlite3 + class DB: def __init__(self, conn): self.conn = conn pass def get_config(self): - return self._db().execute(''' - select version, name, description, secret_key, captcha_key, registration_enabled from config - ''' - ).fetchone() + return ( + self._db() + .execute( + """ + select version, name, description, secret_key, captcha_key, registration_enabled, login_required from config + """ + ) + .fetchone() + ) def get_forums(self): - return self._db().execute(''' + return self._db().execute( + """ select f.forum_id, name, description, thread_id, title, update_time from forums f left join threads t @@ -23,20 +30,41 @@ class DB: order by update_time desc limit 1 ) - ''' + """ ) def get_forum(self, forum_id): - return self._db().execute(''' + return ( + self._db() + .execute( + """ select name, description from forums where forum_id = ? - ''', - (forum_id,) - ).fetchone() + """, + (forum_id,), + ) + .fetchone() + ) + + def get_thread_forum(self, thread_id): + """ Returns forum_id of a thread """ + return ( + self._db() + .execute( + """ + select forum_id + from threads + where thread_id = ? + """, + (thread_id,), + ) + .fetchone()[0] + ) def get_threads(self, forum_id, offset, limit, user_id): - return self._db().execute(''' + return self._db().execute( + """ select t.thread_id, title, @@ -70,20 +98,22 @@ class DB: order by t.update_time desc limit ? offset ? - ''', - (forum_id, user_id, limit, offset) + """, + (forum_id, user_id, limit, offset), ) def get_thread(self, thread): db = self._db() - title, text, author, author_id, create_time, modify_time, hidden = db.execute(''' - select title, text, name, author_id, create_time, modify_time, hidden + title, text, author, author_id, create_time, modify_time, hidden, forum_id = db.execute( + """ + select title, text, name, author_id, create_time, modify_time, hidden, forum_id from threads, users where thread_id = ? and author_id = user_id - ''', - (thread,) + """, + (thread,), ).fetchone() - comments = db.execute(''' + comments = db.execute( + """ select comment_id, parent_id, @@ -97,59 +127,91 @@ class DB: left join users on author_id = user_id where thread_id = ? - ''', - (thread,) + """, + (thread,), + ) + return ( + title, + text, + author, + author_id, + create_time, + modify_time, + comments, + hidden, + forum_id ) - return title, text, author, author_id, create_time, modify_time, comments, hidden def get_thread_title(self, thread_id): - return self._db().execute(''' + return ( + self._db() + .execute( + """ select title from threads where thread_id = ? - ''', - (thread_id,) - ).fetchone() + """, + (thread_id,), + ) + .fetchone() + ) def get_thread_title_text(self, thread_id): - return self._db().execute(''' + return ( + self._db() + .execute( + """ select title, text from threads where thread_id = ? - ''', - (thread_id,) - ).fetchone() + """, + (thread_id,), + ) + .fetchone() + ) def get_recent_threads(self, limit): - return self._db().execute(''' + return self._db().execute( + """ select thread_id, title, modify_date from threads order by modify_date limit ? - ''', - (limit,) + """, + (limit,), ) def get_comment(self, comment_id): - return self._db().execute(''' + return ( + self._db() + .execute( + """ select title, c.text from comments c, threads t where comment_id = ? and c.thread_id = t.thread_id - ''', - (comment_id,) - ).fetchone() + """, + (comment_id,), + ) + .fetchone() + ) def get_subcomments(self, comment_id): db = self._db() - thread_id, parent_id, title = db.execute(''' + thread_id, parent_id, title = db.execute( + """ select threads.thread_id, parent_id, title from threads, comments where comment_id = ? and threads.thread_id = comments.thread_id - ''', - (comment_id,) + """, + (comment_id,), ).fetchone() # Recursive CTE, see https://www.sqlite.org/lang_with.html - return thread_id, parent_id, title, db.execute(''' + return ( + thread_id, + parent_id, + title, + db.execute( + """ with recursive descendant_of(id) as ( select comment_id from comments where comment_id = ? @@ -171,112 +233,148 @@ class DB: users where id = comment_id and user_id = author_id - ''', - (comment_id,) + """, + (comment_id,), + ), ) def get_user_password(self, username): - return self._db().execute(''' + return ( + self._db() + .execute( + """ select user_id, password from users where name = lower(?) - ''', - (username,) - ).fetchone() + """, + (username,), + ) + .fetchone() + ) def get_user_password_by_id(self, user_id): - return self._db().execute(''' + return ( + self._db() + .execute( + """ select password from users where user_id = ? - ''', - (user_id,) - ).fetchone() + """, + (user_id,), + ) + .fetchone() + ) def set_user_password(self, user_id, password): - return self.change_one(''' + return self.change_one( + """ update users set password = ? where user_id = ? - ''', - (password, user_id) + """, + (password, user_id), ) def get_user_public_info(self, user_id): - return self._db().execute(''' + return ( + self._db() + .execute( + """ select name, about, banned_until from users where user_id = ? - ''', - (user_id,) - ).fetchone() + """, + (user_id,), + ) + .fetchone() + ) def get_user_private_info(self, user_id): - return self._db().execute(''' + return ( + self._db() + .execute( + """ select about from users where user_id = ? - ''', - (user_id,) - ).fetchone() + """, + (user_id,), + ) + .fetchone() + ) def set_user_private_info(self, user_id, about): db = self._db() - db.execute(''' + db.execute( + """ update users set about = ? where user_id = ? - ''', - (about, user_id) + """, + (about, user_id), ) db.commit() def get_user_name_role_banned(self, user_id): - return self._db().execute(''' + return ( + self._db() + .execute( + """ select name, role, banned_until from users where user_id = ? - ''', - (user_id,) - ).fetchone() + """, + (user_id,), + ) + .fetchone() + ) def get_user_name(self, user_id): - return self._db().execute(''' + return ( + self._db() + .execute( + """ select name from users where user_id = ? - ''', - (user_id,) - ).fetchone() + """, + (user_id,), + ) + .fetchone() + ) def add_thread(self, author_id, forum_id, title, text, time): db = self._db() c = db.cursor() - c.execute(''' + c.execute( + """ insert into threads (author_id, forum_id, title, text, create_time, modify_time, update_time) select ?, ?, ?, ?, ?, ?, ? from users where user_id = ? and banned_until < ? - ''', - (author_id, forum_id, title, text, time, time, time, author_id, time) + """, + (author_id, forum_id, title, text, time, time, time, author_id, time), ) rowid = c.lastrowid if rowid is None: return None db.commit() - return db.execute(''' + return db.execute( + """ select thread_id from threads where rowid = ? - ''', - (rowid,) + """, + (rowid,), ).fetchone() def delete_thread(self, user_id, thread_id): db = self._db() c = db.cursor() - c.execute(''' + c.execute( + """ delete from threads -- 1 = moderator, 2 = admin @@ -284,8 +382,8 @@ class DB: author_id = ? or (select 1 from users where user_id = ? and (role = 1 or role = 2)) ) - ''', - (thread_id, user_id, user_id) + """, + (thread_id, user_id, user_id), ) db.commit() return c.rowcount > 0 @@ -293,7 +391,8 @@ class DB: def delete_comment(self, user_id, comment_id): db = self._db() c = db.cursor() - c.execute(''' + c.execute( + """ delete from comments where comment_id = ? @@ -304,8 +403,8 @@ class DB: ) -- Don't allow deleting comments with children and (select 1 from comments where parent_id = ?) is null - ''', - (comment_id, user_id, user_id, comment_id) + """, + (comment_id, user_id, user_id, comment_id), ) db.commit() return c.rowcount > 0 @@ -313,21 +412,23 @@ class DB: def add_comment_to_thread(self, thread_id, author_id, text, time): db = self._db() c = db.cursor() - c.execute(''' + c.execute( + """ insert into comments(thread_id, author_id, text, create_time, modify_time) select ?, ?, ?, ?, ? from threads, users where thread_id = ? and user_id = ? and banned_until < ? - ''', - (thread_id, author_id, text, time, time, thread_id, author_id, time) + """, + (thread_id, author_id, text, time, time, thread_id, author_id, time), ) if c.rowcount > 0: - c.execute(''' + c.execute( + """ update threads set update_time = ? where thread_id = ? - ''', - (time, thread_id) + """, + (time, thread_id), ) db.commit() return True @@ -336,16 +437,18 @@ class DB: def add_comment_to_comment(self, parent_id, author_id, text, time): db = self._db() c = db.cursor() - c.execute(''' + c.execute( + """ insert into comments(thread_id, parent_id, author_id, text, create_time, modify_time) select thread_id, ?, ?, ?, ?, ? from comments, users where comment_id = ? and user_id = ? and banned_until < ? - ''', - (parent_id, author_id, text, time, time, parent_id, author_id, time) + """, + (parent_id, author_id, text, time, time, parent_id, author_id, time), ) if c.rowcount > 0: - c.execute(''' + c.execute( + """ update threads set update_time = ? where threads.thread_id = ( @@ -353,8 +456,8 @@ class DB: from comments c where comment_id = ? ) - ''', - (time, parent_id) + """, + (time, parent_id), ) db.commit() return True @@ -363,7 +466,8 @@ class DB: def modify_thread(self, thread_id, user_id, title, text, time): db = self._db() c = db.cursor() - c.execute(''' + c.execute( + """ update threads set title = ?, text = ?, modify_time = ? where thread_id = ? and ( @@ -371,13 +475,17 @@ class DB: -- 1 = moderator, 2 = admin or (select 1 from users where user_id = ? and (role = 1 or role = 2)) ) - ''', + """, ( - title, text, time, + title, + text, + time, thread_id, - user_id, user_id, time, user_id, - ) + user_id, + time, + user_id, + ), ) if c.rowcount > 0: db.commit() @@ -387,7 +495,8 @@ class DB: def modify_comment(self, comment_id, user_id, text, time): db = self._db() c = db.cursor() - c.execute(''' + c.execute( + """ update comments set text = ?, modify_time = ? where comment_id = ? and ( @@ -395,13 +504,16 @@ class DB: -- 1 = moderator, 2 = admin or (select 1 from users where user_id = ? and (role = 1 or role = 2)) ) - ''', + """, ( - text, time, + text, + time, comment_id, - user_id, user_id, time, user_id, - ) + user_id, + time, + user_id, + ), ) if c.rowcount > 0: db.commit() @@ -409,19 +521,20 @@ class DB: return False def register_user(self, username, password, time): - ''' + """ Add a user if registrations are enabled. - ''' + """ try: db = self._db() c = db.cursor() - c.execute(''' + c.execute( + """ insert into users(name, password, join_time) select lower(?), ?, ? from config where registration_enabled = 1 - ''', - (username, password, time) + """, + (username, password, time), ) if c.rowcount > 0: db.commit() @@ -429,12 +542,13 @@ class DB: # up by name. # ROWID is *probably* not always consistent (race conditions). # Ideally we get the ID immediately on insert. - return c.execute(''' + return c.execute( + """ select user_id from users where name = lower(?) - ''', - (username,) + """, + (username,), ).fetchone() return None except sqlite3.IntegrityError: @@ -442,17 +556,18 @@ class DB: return None def add_user(self, username, password, time): - ''' + """ Add a user without checking if registrations are enabled. - ''' + """ try: db = self._db() c = db.cursor() - c.execute(''' + c.execute( + """ insert into users(name, password, join_time) values (lower(?), ?, ?) - ''', - (username, password, time) + """, + (username, password, time), ) if c.rowcount > 0: db.commit() @@ -463,90 +578,102 @@ class DB: return False def get_users(self): - return self._db().execute(''' + return self._db().execute( + """ select user_id, name, join_time, role, banned_until from users - ''', + """, ) def set_forum_name(self, forum_id, name): - return self.change_one(''' + return self.change_one( + """ update forums set name = ? where forum_id = ? - ''', - (name, forum_id) + """, + (name, forum_id), ) def set_forum_description(self, forum_id, description): - return self.change_one(''' + return self.change_one( + """ update forums set description = ? where forum_id = ? - ''', - (description, forum_id) + """, + (description, forum_id), ) def add_forum(self, name, description): db = self._db() - db.execute(''' + db.execute( + """ insert into forums(name, description) values (?, ?) - ''', - (name, description) + """, + (name, description), ) db.commit() - def set_config(self, server_name, server_description, registration_enabled): - return self.change_one(''' + def set_config( + self, server_name, server_description, registration_enabled, login_required + ): + return self.change_one( + """ update config - set name = ?, description = ?, registration_enabled = ? - ''', - (server_name, server_description, registration_enabled) + set name = ?, description = ?, registration_enabled = ?, login_required = ? + """, + (server_name, server_description, registration_enabled, login_required), ) def set_config_secrets(self, secret_key, captcha_key): - return self.change_one(''' + return self.change_one( + """ update config set secret_key = ?, captcha_key = ? - ''', - (secret_key, captcha_key) + """, + (secret_key, captcha_key), ) def set_user_ban(self, user_id, until): - return self.change_one(''' + return self.change_one( + """ update users set banned_until = ? where user_id = ? - ''', - (until, user_id) + """, + (until, user_id), ) def set_user_role(self, user_id, role): - return self.change_one(''' + return self.change_one( + """ update users set role = ? where user_id = ? - ''', - (role, user_id) + """, + (role, user_id), ) def set_thread_hidden(self, thread_id, hide): - return self.change_one(''' + return self.change_one( + """ update threads set hidden = ? where thread_id = ? - ''', - (hide, thread_id) + """, + (hide, thread_id), ) def set_comment_hidden(self, comment_id, hide): - return self.change_one(''' + return self.change_one( + """ update comments set hidden = ? where comment_id = ? - ''', - (hide, comment_id) + """, + (hide, comment_id), ) def change_one(self, query, values): diff --git a/init_sqlite.sh b/init_sqlite.sh index 2709ceb..14009ea 100755 --- a/init_sqlite.sh +++ b/init_sqlite.sh @@ -35,7 +35,8 @@ $SQLITE "$1" -init schema.txt "insert into config ( description, secret_key, captcha_key, - registration_enabled + registration_enabled, + login_required ) values ( 'agreper-v0.1.1', @@ -43,7 +44,8 @@ values ( '', '$(head -c 30 /dev/urandom | base64)', '$(head -c 30 /dev/urandom | base64)', - 0 + 0, + 0 )" if [ "$2" != --no-admin ] then diff --git a/main.py b/main.py index df78fb3..d4842bb 100644 --- a/main.py +++ b/main.py @@ -1,4 +1,4 @@ -VERSION = 'agreper-v0.1.1' +VERSION = "agreper-v0.1.1q1" # TODO put in config table THREADS_PER_PAGE = 50 @@ -12,50 +12,74 @@ from datetime import datetime import captcha, password, minimd app = Flask(__name__) -db = DB(os.getenv('DB')) +db = DB(os.getenv("DB")) # This defaults to None, which allows CSRF attacks in FireFox # and older versions of Chrome. # 'Lax' is sufficient to prevent malicious POST requests. -app.config['SESSION_COOKIE_SAMESITE'] = 'Lax' +app.config["SESSION_COOKIE_SAMESITE"] = "Lax" + class Config: pass + + config = Config() -config.version, config.server_name, config.server_description, app.config['SECRET_KEY'], config.captcha_key, config.registration_enabled = db.get_config() +( + config.version, + config.server_name, + config.server_description, + app.config["SECRET_KEY"], + config.captcha_key, + config.registration_enabled, + config.login_required +) = db.get_config() if config.version != VERSION: - print(f'Incompatible version {config.version} (expected {VERSION})') + print(f"Incompatible version {config.version} (expected {VERSION})") sys.exit(1) + class Role: USER = 0 MODERATOR = 1 ADMIN = 2 + +@app.before_request +def before_request(): + if config.login_required: + user_id = session.get("user_id", -1) + if user_id == -1 and request.endpoint not in ("login","static"): + return redirect(url_for("login")) + + + @app.after_request def after_request(response): # This forbids other sites from embedding this site in an iframe, # preventing clickjacking attacks. - response.headers['X-Frame-Options'] = 'DENY' + response.headers["X-Frame-Options"] = "DENY" return response -@app.route('/') + +@app.route("/") def index(): return render_template( - 'index.html', - title = config.server_name, - description = config.server_description, - config = config, - user = get_user(), - forums = db.get_forums() + "index.html", + title=config.server_name, + description=config.server_description, + config=config, + user=get_user(), + forums=db.get_forums(), ) -@app.route('/forum//') + +@app.route("/forum//") def forum(forum_id): title, description = db.get_forum(forum_id) - offset = int(request.args.get('p', 0)) - user_id = session.get('user_id', -1) + offset = int(request.args.get("p", 0)) + user_id = session.get("user_id", -1) threads = [*db.get_threads(forum_id, offset, THREADS_PER_PAGE + 1, user_id)] if len(threads) == THREADS_PER_PAGE + 1: threads.pop() @@ -63,267 +87,294 @@ def forum(forum_id): else: next_page = None return render_template( - 'forum.html', - title = title, - user = get_user(), - config = config, - forum_id = forum_id, - description = description, - threads = threads, - next_page = next_page, - prev_page = max(offset - THREADS_PER_PAGE, 0) if offset > 0 else None, + "forum.html", + title=title, + user=get_user(), + config=config, + forum_id=forum_id, + description=description, + threads=threads, + next_page=next_page, + prev_page=max(offset - THREADS_PER_PAGE, 0) if offset > 0 else None, ) -@app.route('/thread//') + +@app.route("/thread//") def thread(thread_id): user = get_user() - title, text, author, author_id, create_time, modify_time, comments, hidden = db.get_thread(thread_id) + ( + title, + text, + author, + author_id, + create_time, + modify_time, + comments, + hidden, + forum_id + ) = db.get_thread(thread_id) + forum_title, _ = db.get_forum(forum_id) + comments = create_comment_tree(comments, user) return render_template( - 'thread.html', - title = title, - config = config, - user = user, - text = text, - author = author, - author_id = author_id, - thread_id = thread_id, - hidden = hidden, - create_time = create_time, - modify_time = modify_time, - comments = comments, + "thread.html", + title=title, + config=config, + user=user, + text=text, + author=author, + author_id=author_id, + thread_id=thread_id, + forum_id=forum_id, + forum_title=forum_title, + hidden=hidden, + create_time=create_time, + modify_time=modify_time, + comments=comments, ) -@app.route('/comment//') + +@app.route("/comment//") def comment(comment_id): user = get_user() thread_id, parent_id, title, comments = db.get_subcomments(comment_id) + forum_id = db.get_thread_forum(thread_id) + forum_title, _ = db.get_forum(forum_id) + comments = create_comment_tree(comments, user) - reply_comment, = comments + (reply_comment,) = comments comments = reply_comment.children reply_comment.children = [] return render_template( - 'comments.html', - title = title, - config = config, - user = user, - reply_comment = reply_comment, - comments = comments, - parent_id = parent_id, - thread_id = thread_id, + "comments.html", + title=title, + config=config, + user=user, + reply_comment=reply_comment, + comments=comments, + parent_id=parent_id, + thread_id=thread_id, + forum_id=forum_id, + forum_title=forum_title ) -@app.route('/login/', methods = ['GET', 'POST']) + +@app.route("/login/", methods=["GET", "POST"]) def login(): - if request.method == 'POST': - v = db.get_user_password(request.form['username']) + if request.method == "POST": + v = db.get_user_password(request.form["username"]) if v is not None: id, hash = v - if password.verify(request.form['password'], hash): - flash('Logged in', 'success') - session['user_id'] = id + time.sleep(0.3) + if password.verify(request.form["password"], hash): + flash("Logged in", "success") + session["user_id"] = id session.permanent = True - return redirect(url_for('index')) + return redirect(url_for("index")) else: # Sleep to reduce effectiveness of bruteforce - time.sleep(0.1) - flash('Username or password is invalid', 'error') - return render_template( - 'login.html', - title = 'Login', - config = config, - user = get_user() - ) + time.sleep(0.5) + flash("Username or password is invalid", "error") + return render_template("login.html", title="Login", config=config, user=get_user()) -@app.route('/logout/') + +@app.route("/logout/") def logout(): - session.pop('user_id') - return redirect(url_for('index')) + session.pop("user_id") + return redirect(url_for("index")) -@app.route('/user/', methods = ['GET', 'POST']) + +@app.route("/user/", methods=["GET", "POST"]) def user_edit(): user = get_user() if user is None: - return redirect(url_for('login')) + return redirect(url_for("login")) - if request.method == 'POST': - about = trim_text(request.form['about']) + if request.method == "POST": + about = trim_text(request.form["about"]) db.set_user_private_info(user.id, about) - flash('Updated profile', 'success') + flash("Updated profile", "success") else: - about, = db.get_user_private_info(user.id) + (about,) = db.get_user_private_info(user.id) return render_template( - 'user_edit.html', - title = 'Edit profile', - config = config, - user = user, - about = about + "user_edit.html", title="Edit profile", config=config, user=user, about=about ) -@app.route('/user/edit/password/', methods = ['POST']) + +@app.route("/user/edit/password/", methods=["POST"]) def user_edit_password(): - user_id = session.get('user_id') + user_id = session.get("user_id") if user_id is None: - return redirect(url_for('login')) + return redirect(url_for("login")) - new = request.form['new'] + new = request.form["new"] if len(new) < 8: - flash('New password must be at least 8 characters long', 'error') + flash("New password must be at least 8 characters long", "error") else: - hash, = db.get_user_password_by_id(user_id) - if password.verify(request.form['old'], hash): + (hash,) = db.get_user_password_by_id(user_id) + if password.verify(request.form["old"], hash): if db.set_user_password(user_id, password.hash(new)): - flash('Updated password', 'success') + flash("Updated password", "success") else: - flash('Failed to update password', 'error') + flash("Failed to update password", "error") else: - flash('Old password does not match', 'error') - return redirect(url_for('user_edit')) + flash("Old password does not match", "error") + return redirect(url_for("user_edit")) -@app.route('/user//') + +@app.route("/user//") def user_info(user_id): name, about, banned_until = db.get_user_public_info(user_id) return render_template( - 'user_info.html', - title = 'Profile', - config = config, - user = get_user(), - name = name, - id = user_id, - banned_until = banned_until, - about = about + "user_info.html", + title="Profile", + config=config, + user=get_user(), + name=name, + id=user_id, + banned_until=banned_until, + about=about, ) -@app.route('/forum//new/', methods = ['GET', 'POST']) + +@app.route("/forum//new/", methods=["GET", "POST"]) def new_thread(forum_id): - user_id = session.get('user_id') + user_id = session.get("user_id") if user_id is None and not config.registration_enabled: # Can't create a thread without an account - return redirect(url_for('login')) + return redirect(url_for("login")) - if request.method == 'POST': + if request.method == "POST": if user_id is None: # Attempt to create a user account first if register_user(True): - user_id = session['user_id'] + user_id = session["user_id"] if user_id is not None: - title, text = request.form['title'].strip(), trim_text(request.form['text']) + title, text = request.form["title"].strip(), trim_text(request.form["text"]) title = title.strip() - if title == '' or text == '': - flash('Title and text may not be empty', 'error') - return redirect(url_for('forum', forum_id = forum_id)) + if title == "" or text == "": + flash("Title and text may not be empty", "error") + return redirect(url_for("forum", forum_id=forum_id)) id = db.add_thread(user_id, forum_id, title, text, time.time_ns()) if id is None: - flash('Failed to create thread', 'error') - return redirect(url_for('forum', forum_id = forum_id)) + flash("Failed to create thread", "error") + return redirect(url_for("forum", forum_id=forum_id)) else: - id, = id - flash('Created thread', 'success') - return redirect(url_for('thread', thread_id = id)) + (id,) = id + flash("Created thread", "success") + return redirect(url_for("thread", thread_id=id)) return render_template( - 'new_thread.html', - title = 'Create new thread', - config = config, - user = get_user(), + "new_thread.html", + title="Create new thread", + config=config, + user=get_user(), ) -@app.route('/thread//confirm_delete/') + +@app.route("/thread//confirm_delete/") def confirm_delete_thread(thread_id): - title, = db.get_thread_title(thread_id) + (title,) = db.get_thread_title(thread_id) return render_template( - 'confirm_delete_thread.html', - title = 'Delete thread', - config = config, - user = get_user(), - thread_title = title, + "confirm_delete_thread.html", + title="Delete thread", + config=config, + user=get_user(), + thread_title=title, ) -@app.route('/thread//delete/', methods = ['POST']) + +@app.route("/thread//delete/", methods=["POST"]) def delete_thread(thread_id): - user_id = session.get('user_id') + user_id = session.get("user_id") if user_id is None: - return redirect(url_for('login')) + return redirect(url_for("login")) if db.delete_thread(user_id, thread_id): - flash('Thread has been deleted', 'success') + flash("Thread has been deleted", "success") else: - flash('Thread could not be removed', 'error') + flash("Thread could not be removed", "error") # TODO return 403, maybe? - return redirect(url_for('index')) + return redirect(url_for("index")) + def _add_comment_check_user(): - user_id = session.get('user_id') + user_id = session.get("user_id") if user_id is not None: return user_id if not config.registration_enabled: - flash('Registrations are not enabled. Please log in to comment', 'error') + flash("Registrations are not enabled. Please log in to comment", "error") if register_user(True): - return session['user_id'] + return session["user_id"] -@app.route('/thread//comment/', methods = ['POST']) + +@app.route("/thread//comment/", methods=["POST"]) def add_comment(thread_id): user_id = _add_comment_check_user() if user_id is not None: - text = trim_text(request.form['text']) - if text == '': - flash('Text may not be empty', 'error') + text = trim_text(request.form["text"]) + if text == "": + flash("Text may not be empty", "error") elif db.add_comment_to_thread(thread_id, user_id, text, time.time_ns()): - flash('Added comment', 'success') + flash("Added comment", "success") else: - flash('Failed to add comment', 'error') - return redirect(url_for('thread', thread_id = thread_id)) + flash("Failed to add comment", "error") + return redirect(url_for("thread", thread_id=thread_id)) -@app.route('/comment//comment/', methods = ['POST']) + +@app.route("/comment//comment/", methods=["POST"]) def add_comment_parent(comment_id): user_id = _add_comment_check_user() if user_id is not None: - text = trim_text(request.form['text']) - if text == '': - flash('Text may not be empty', 'error') + text = trim_text(request.form["text"]) + if text == "": + flash("Text may not be empty", "error") elif db.add_comment_to_comment(comment_id, user_id, text, time.time_ns()): - flash('Added comment', 'success') + flash("Added comment", "success") else: - flash('Failed to add comment', 'error') - return redirect(url_for('comment', comment_id = comment_id)) + flash("Failed to add comment", "error") + return redirect(url_for("comment", comment_id=comment_id)) -@app.route('/comment//confirm_delete/') + +@app.route("/comment//confirm_delete/") def confirm_delete_comment(comment_id): title, text = db.get_comment(comment_id) return render_template( - 'confirm_delete_comment.html', - title = 'Delete comment', - config = config, - user = get_user(), - thread_title = title, - text = text, + "confirm_delete_comment.html", + title="Delete comment", + config=config, + user=get_user(), + thread_title=title, + text=text, ) -@app.route('/comment//delete/', methods = ['POST']) + +@app.route("/comment//delete/", methods=["POST"]) def delete_comment(comment_id): - user_id = session.get('user_id') + user_id = session.get("user_id") if user_id is None: - return redirect(url_for('login')) + return redirect(url_for("login")) if db.delete_comment(user_id, comment_id): - flash('Comment has been deleted', 'success') + flash("Comment has been deleted", "success") else: - flash('Comment could not be removed', 'error') + flash("Comment could not be removed", "error") # TODO return 403, maybe? - return redirect(url_for('index')) + return redirect(url_for("index")) -@app.route('/thread//edit/', methods = ['GET', 'POST']) + +@app.route("/thread//edit/", methods=["GET", "POST"]) def edit_thread(thread_id): - user_id = session.get('user_id') + user_id = session.get("user_id") if user_id is None: - return redirect(url_for('login')) + return redirect(url_for("login")) - if request.method == 'POST': - title, text = request.form['title'].strip(), trim_text(request.form['text']) - if title == '' or text == '': - flash('Title and text may not be empty', 'error') + if request.method == "POST": + title, text = request.form["title"].strip(), trim_text(request.form["text"]) + if title == "" or text == "": + flash("Title and text may not be empty", "error") elif db.modify_thread( thread_id, user_id, @@ -331,141 +382,150 @@ def edit_thread(thread_id): text, time.time_ns(), ): - flash('Thread has been edited', 'success') + flash("Thread has been edited", "success") else: - flash('Thread could not be edited', 'error') - return redirect(url_for('thread', thread_id = thread_id)) + flash("Thread could not be edited", "error") + return redirect(url_for("thread", thread_id=thread_id)) title, text = db.get_thread_title_text(thread_id) return render_template( - 'edit_thread.html', - title = 'Edit thread', - config = config, - user = get_user(), - thread_title = title, - text = text, + "edit_thread.html", + title="Edit thread", + config=config, + user=get_user(), + thread_title=title, + text=text, ) -@app.route('/comment//edit/', methods = ['GET', 'POST']) -def edit_comment(comment_id): - user_id = session.get('user_id') - if user_id is None: - return redirect(url_for('login')) - if request.method == 'POST': - text = trim_text(request.form['text']) - if text == '': - flash('Text may not be empty', 'error') +@app.route("/comment//edit/", methods=["GET", "POST"]) +def edit_comment(comment_id): + user_id = session.get("user_id") + if user_id is None: + return redirect(url_for("login")) + + if request.method == "POST": + text = trim_text(request.form["text"]) + if text == "": + flash("Text may not be empty", "error") elif db.modify_comment( comment_id, user_id, - trim_text(request.form['text']), + trim_text(request.form["text"]), time.time_ns(), ): - flash('Comment has been edited', 'success') + flash("Comment has been edited", "success") else: - flash('Comment could not be edited', 'error') - return redirect(url_for('comment', comment_id = comment_id)) + flash("Comment could not be edited", "error") + return redirect(url_for("comment", comment_id=comment_id)) title, text = db.get_comment(comment_id) return render_template( - 'edit_comment.html', - title = 'Edit comment', - config = config, - user = get_user(), - thread_title = title, - text = text, + "edit_comment.html", + title="Edit comment", + config=config, + user=get_user(), + thread_title=title, + text=text, ) -@app.route('/register/', methods = ['GET', 'POST']) + +@app.route("/register/", methods=["GET", "POST"]) def register(): - if request.method == 'POST': - username, passwd = request.form['username'], request.form['password'] + if request.method == "POST": + username, passwd = request.form["username"], request.form["password"] if register_user(False): - return redirect(url_for('index')) + return redirect(url_for("index")) capt, answer = captcha.generate(config.captcha_key) return render_template( - 'register.html', - title = 'Register', - config = config, - user = get_user(), - captcha = capt, - answer = answer, + "register.html", + title="Register", + config=config, + user=get_user(), + captcha=capt, + answer=answer, ) -@app.route('/admin/') + +@app.route("/admin/") def admin(): chk, user = _admin_check() if not chk: return user return render_template( - 'admin/index.html', - title = 'Admin panel', - config = config, - forums = db.get_forums(), - users = db.get_users(), + "admin/index.html", + title="Admin panel", + config=config, + forums=db.get_forums(), + users=db.get_users(), ) -@app.route('/admin/query/', methods = ['GET', 'POST']) + +@app.route("/admin/query/", methods=["GET", "POST"]) def admin_query(): chk, user = _admin_check() if not chk: return user try: - rows, rowcount = db.query(request.form['q']) if request.method == 'POST' else [] + rows, rowcount = db.query(request.form["q"]) if request.method == "POST" else [] if rowcount > 0: - flash(f'{rowcount} rows changed', 'success') + flash(f"{rowcount} rows changed", "success") except Exception as e: - flash(e, 'error') + flash(e, "error") rows = [] return render_template( - 'admin/query.html', - title = 'Query', - config = config, - rows = rows, + "admin/query.html", + title="Query", + config=config, + rows=rows, ) -@app.route('/admin/forum//edit//', methods = ['POST']) + +@app.route("/admin/forum//edit//", methods=["POST"]) def admin_edit_forum(forum_id, what): chk, user = _admin_check() if not chk: return user try: - if what == 'description': - res = db.set_forum_description(forum_id, trim_text(request.form['description'])) - elif what == 'name': - res = db.set_forum_name(forum_id, request.form['name']) + if what == "description": + res = db.set_forum_description( + forum_id, trim_text(request.form["description"]) + ) + elif what == "name": + res = db.set_forum_name(forum_id, request.form["name"]) else: - flash(f'Unknown property "{what}"', 'error') + flash(f'Unknown property "{what}"', "error") res = None if res is True: - flash(f'Updated {what}', 'success') + flash(f"Updated {what}", "success") elif res is False: - flash(f'Failed to update {what}', 'error') + flash(f"Failed to update {what}", "error") except Exception as e: - flash(e, 'error') - return redirect(url_for('admin')) + flash(e, "error") + return redirect(url_for("admin")) -@app.route('/admin/forum/new/', methods = ['POST']) + +@app.route("/admin/forum/new/", methods=["POST"]) def admin_new_forum(): chk, user = _admin_check() if not chk: return user try: - db.add_forum(request.form['name'], trim_text(request.form['description'])) - flash('Added forum', 'success') + db.add_forum(request.form["name"], trim_text(request.form["description"])) + flash("Added forum", "success") except Exception as e: - flash(str(e), 'error') - return redirect(url_for('admin')) + flash(str(e), "error") + return redirect(url_for("admin")) -@app.route('/admin/config/edit/', methods = ['POST']) + +@app.route("/admin/config/edit/", methods=["POST"]) def admin_edit_config(): chk, user = _admin_check() if not chk: @@ -473,17 +533,19 @@ def admin_edit_config(): try: db.set_config( - request.form['server_name'], - trim_text(request.form['server_description']), - 'registration_enabled' in request.form, + request.form["server_name"], + trim_text(request.form["server_description"]), + "registration_enabled" in request.form, + "login_required" in request.form, ) - flash('Updated config. Refresh the page to see the changes.', 'success') + flash("Updated config. Refresh the page to see the changes.", "success") restart() except Exception as e: - flash(str(e), 'error') - return redirect(url_for('admin')) + flash(str(e), "error") + return redirect(url_for("admin")) -@app.route('/admin/config/new_secrets/', methods = ['POST']) + +@app.route("/admin/config/new_secrets/", methods=["POST"]) def admin_new_secrets(): chk, user = _admin_check() if not chk: @@ -493,38 +555,42 @@ def admin_new_secrets(): captcha_key = secrets.token_urlsafe(30) try: db.set_config_secrets(secret_key, captcha_key) - flash('Changed secrets. You will be logged out.', 'success') + flash("Changed secrets. You will be logged out.", "success") restart() except Exception as e: - flash(str(e), 'error') - return redirect(url_for('admin')) + flash(str(e), "error") + return redirect(url_for("admin")) + def ban_user(user_id): chk, user = _moderator_check() if not chk: return user - d, t = request.form['days'], request.form['time'] - d = 0 if d == '' else int(d) - h, m = (0, 0) if t == '' else map(int, t.split(':')) + d, t = request.form["days"], request.form["time"] + d = 0 if d == "" else int(d) + h, m = (0, 0) if t == "" else map(int, t.split(":")) until = time.time_ns() + (d * 24 * 60 + h * 60 + m) * (60 * 10**9) - until = min(until, 0x7fff_ffff_ffff_ffff) + until = min(until, 0x7FFF_FFFF_FFFF_FFFF) try: if db.set_user_ban(user_id, until): - flash('Banned user', 'success') + flash("Banned user", "success") else: - flash('Failed to ban user', 'error') + flash("Failed to ban user", "error") except Exception as e: - flash(str(e), 'error') + flash(str(e), "error") -@app.route('/user//ban/', methods = ['POST']) + +@app.route("/user//ban/", methods=["POST"]) def moderator_ban_user(user_id): - return ban_user(user_id) or redirect(url_for('user_info', user_id = user_id)) + return ban_user(user_id) or redirect(url_for("user_info", user_id=user_id)) -@app.route('/admin/user//ban/', methods = ['POST']) + +@app.route("/admin/user//ban/", methods=["POST"]) def admin_ban_user(user_id): - return ban_user(user_id) or redirect(url_for('admin')) + return ban_user(user_id) or redirect(url_for("admin")) + def unban_user(user_id): chk, user = _moderator_check() @@ -533,128 +599,140 @@ def unban_user(user_id): try: if db.set_user_ban(user_id, 0): - flash('Unbanned user', 'success') + flash("Unbanned user", "success") else: - flash('Failed to unban user', 'error') + flash("Failed to unban user", "error") except Exception as e: - flash(str(e), 'error') + flash(str(e), "error") -@app.route('/user//unban/', methods = ['POST']) + +@app.route("/user//unban/", methods=["POST"]) def moderator_unban_user(user_id): - return unban_user(user_id) or redirect(url_for('user_info', user_id = user_id)) + return unban_user(user_id) or redirect(url_for("user_info", user_id=user_id)) -@app.route('/admin/user//unban/', methods = ['POST']) + +@app.route("/admin/user//unban/", methods=["POST"]) def admin_unban_user(user_id): - return unban_user(user_id) or redirect(url_for('admin')) + return unban_user(user_id) or redirect(url_for("admin")) -@app.route('/admin/user/new/', methods = ['POST']) + +@app.route("/admin/user/new/", methods=["POST"]) def admin_new_user(): chk, user = _admin_check() if not chk: return user try: - name, passwd = request.form['name'], request.form['password'] - if name == '' or passwd == '': - flash('Name and password may not be empty') + name, passwd = request.form["name"], request.form["password"] + if name == "" or passwd == "": + flash("Name and password may not be empty") elif db.add_user(name, password.hash(passwd), time.time_ns()): - flash('Added user', 'success') + flash("Added user", "success") else: - flash('Failed to add user', 'error') + flash("Failed to add user", "error") except Exception as e: - flash(str(e), 'error') - return redirect(url_for('admin')) + flash(str(e), "error") + return redirect(url_for("admin")) -@app.route('/admin/user//edit/role/', methods = ['POST']) + +@app.route("/admin/user//edit/role/", methods=["POST"]) def admin_set_role(user_id): chk, user = _admin_check() if not chk: return user try: - role = request.form['role'] - if role not in ('0', '1', '2'): - flash(f'Invalid role type ({role})', 'error') + role = request.form["role"] + if role not in ("0", "1", "2"): + flash(f"Invalid role type ({role})", "error") else: db.set_user_role(user_id, role) - flash('Set user role', 'success') + flash("Set user role", "success") except Exception as e: - flash(str(e), 'error') - return redirect(url_for('admin')) + flash(str(e), "error") + return redirect(url_for("admin")) -@app.route('/admin/restart/', methods = ['POST']) + +@app.route("/admin/restart/", methods=["POST"]) def admin_restart(): chk, user = _admin_check() if not chk: return user restart() - return redirect(url_for('admin')) + return redirect(url_for("admin")) -@app.route('/thread//hide/', methods = ['POST']) + +@app.route("/thread//hide/", methods=["POST"]) def set_hide_thread(thread_id): chk, user = _moderator_check() if not chk: return user try: - hide = request.form['hide'] != '0' - hide_str = 'Hidden' if hide else 'Unhidden' + hide = request.form["hide"] != "0" + hide_str = "Hidden" if hide else "Unhidden" if db.set_thread_hidden(thread_id, hide): - flash(f'{hide_str} thread', 'success') + flash(f"{hide_str} thread", "success") else: - flash(f'Failed to {hide_str.lower()} thread', 'error') + flash(f"Failed to {hide_str.lower()} thread", "error") except Exception as e: - flash(str(e), 'error') + flash(str(e), "error") - return redirect(request.form['redirect']) + return redirect(request.form["redirect"]) -@app.route('/comment//hide/', methods = ['POST']) + +@app.route("/comment//hide/", methods=["POST"]) def set_hide_comment(comment_id): chk, user = _moderator_check() if not chk: return user try: - hide = request.form['hide'] != '0' - hide_str = 'Hidden' if hide else 'Unhidden' + hide = request.form["hide"] != "0" + hide_str = "Hidden" if hide else "Unhidden" if db.set_comment_hidden(comment_id, hide): - flash(f'{hide_str} comment', 'success') + flash(f"{hide_str} comment", "success") else: - flash(f'Failed to {hide_str.lower()} comment', 'error') + flash(f"Failed to {hide_str.lower()} comment", "error") except Exception as e: - flash(str(e), 'error') + flash(str(e), "error") + + return redirect(request.form["redirect"]) - return redirect(request.form['redirect']) # TODO can probably be a static-esque page, maybe? -@app.route('/help/') +@app.route("/help/") def help(): return render_template( - 'help.html', - title = 'Help', - user = get_user(), + "help.html", + title="Help", + user=get_user(), ) + def _moderator_check(): user = get_user() if user is None: - return False, redirect(url_for('login')) + return False, redirect(url_for("login")) if not user.is_moderator(): - return False, ('

Forbidden

', 403) + return False, ("

Forbidden

", 403) return True, user + def _admin_check(): user = get_user() if user is None: - return False, redirect(url_for('login')) + return False, redirect(url_for("login")) if not user.is_admin(): - return False, ('

Forbidden

', 403) + return False, ("

Forbidden

", 403) return True, user class Comment: - def __init__(self, id, parent_id, author_id, author, text, create_time, modify_time, hidden): + def __init__( + self, id, parent_id, author_id, author, text, create_time, modify_time, hidden + ): self.id = id self.author_id = author_id self.author = author @@ -665,14 +743,16 @@ class Comment: self.parent_id = parent_id self.hidden = hidden + def create_comment_tree(comments, user): - start = time.time(); + start = time.time() # Collect comments first, then build the tree in case we encounter a child before a parent - comment_map = { v[0]: Comment(*v) for v in comments } + comment_map = {v[0]: Comment(*v) for v in comments} root = [] # We should keep showing hidden comments if the user replied to them, directly or indirectly. # To do that, keep track of user comments, then walk up the tree and insert hidden comments. user_comments = [] + # Build tree def insert(comment): parent = comment_map.get(comment.parent_id) @@ -680,6 +760,7 @@ def create_comment_tree(comments, user): parent.children.append(comment) else: root.append(comment) + for comment in comment_map.values(): if comment.hidden and (not user or not user.is_moderator()): continue @@ -692,11 +773,13 @@ def create_comment_tree(comments, user): if c.hidden: insert(c) c = comment_map.get(c.parent_id) + # Sort each comment based on create time def sort_time(l): l.sort(key=lambda c: c.modify_time, reverse=True) for c in l: sort_time(c.children) + sort_time(root) return root @@ -717,39 +800,41 @@ class User: def is_banned(self): return self.banned_until > time.time_ns() + def get_user(): - id = session.get('user_id') + id = session.get("user_id") if id is not None: name, role, banned_until = db.get_user_name_role_banned(id) return User(id, name, role, banned_until) return None + def register_user(show_password): - username, passwd = request.form['username'], request.form['password'] + username, passwd = request.form["username"], request.form["password"] if any(c in username for c in string.whitespace): # This error is more ergonomic in case someone tries to play tricks again :) - flash('Username may not contain whitespace', 'error') + flash("Username may not contain whitespace", "error") elif len(username) < 3: - flash('Username must be at least 3 characters long', 'error') + flash("Username must be at least 3 characters long", "error") elif len(passwd) < 8: - flash('Password must be at least 8 characters long', 'error') + flash("Password must be at least 8 characters long", "error") elif not captcha.verify( config.captcha_key, - request.form['captcha'], - request.form['answer'], + request.form["captcha"], + request.form["answer"], ): - flash('CAPTCHA answer is incorrect', 'error') + flash("CAPTCHA answer is incorrect", "error") else: uid = db.register_user(username, password.hash(passwd), time.time_ns()) if uid is None: - flash('Failed to create account (username may already be taken)', 'error') + flash("Failed to create account (username may already be taken)", "error") else: - s = 'Account has been created.' + s = "Account has been created." if show_password: - s += f' Your password is {passwd} (hover to reveal).' - flash(s, 'success') - uid, = uid - session['user_id'] = uid + s += f" Your password is {passwd} (hover to reveal)." + flash(s, "success") + (uid,) = uid + session["user_id"] = uid session.permanent = True return True return False @@ -759,7 +844,7 @@ def register_user(show_password): def utility_processor(): def _format_time_delta(n, t): # Try the sane thing first - dt = (n - t) // 10 ** 9 + dt = (n - t) // 10**9 if dt < 1: return "less than a second" if dt < 2: @@ -778,70 +863,73 @@ def utility_processor(): return f"{dt // (3600 * 24)} days" # Try some very rough estimate, whatever - f = lambda x: datetime.utcfromtimestamp(x // 10 ** 9) + f = lambda x: datetime.utcfromtimestamp(x // 10**9) n, t = f(n), f(t) + def f(x, y, s): return f'{y - x} {s}{"s" if y - x > 1 else ""}' + if t.year < n.year: return f(t.year, n.year, "year") if t.month < n.month: return f(t.month, n.month, "month") - assert False, 'unreachable' + assert False, "unreachable" def format_since(t): n = time.time_ns() if n < t: - return 'in a distant future' - return _format_time_delta(n, t) + ' ago' + return "in a distant future" + return _format_time_delta(n, t) + " ago" def format_until(t): n = time.time_ns() if t <= n: - return 'in a distant past' + return "in a distant past" return _format_time_delta(t, n) def format_time(t): - return datetime.utcfromtimestamp(t / 10 ** 9).replace(microsecond=0) + return datetime.utcfromtimestamp(t / 10**9).replace(microsecond=0) def rand_password(): - ''' + """ Generate a random password. The current implementation returns 12 random lower- and uppercase alphabet characters. This gives up to `log((26 * 2) ** 12) / log(2) = ~68` bits of entropy, which should be enough for the foreseeable future. - ''' - return ''.join(string.ascii_letters[secrets.randbelow(52)] for _ in range(12)) + """ + return "".join(string.ascii_letters[secrets.randbelow(52)] for _ in range(12)) def gen_captcha(): return captcha.generate(config.captcha_key) return { - 'format_since': format_since, - 'format_time': format_time, - 'format_until': format_until, - 'minimd': minimd.html, - 'rand_password': rand_password, - 'gen_captcha': gen_captcha, + "format_since": format_since, + "format_time": format_time, + "format_until": format_until, + "minimd": minimd.html, + "rand_password": rand_password, + "gen_captcha": gen_captcha, } def restart(): - ''' + """ Shut down *all* workers and spawn new ones. This is necessary on e.g. a configuration change. Since restarting workers depends is platform-dependent this task is delegated to an external program. - ''' - r = subprocess.call(['./restart.sh']) + """ + r = subprocess.call(["./restart.sh"]) if r == 0: - flash('Restart script exited successfully', 'success') + flash("Restart script exited successfully", "success") else: - flash(f'Restart script exited with error (code {r})', 'error') + flash(f"Restart script exited with error (code {r})", "error") + def trim_text(s): - ''' + """ Because browsers LOVE \\r, trailing whitespace etc. - ''' - return s.replace('\r', '') + """ + return s.replace("\r", "") diff --git a/minimd.py b/minimd.py index 3f456b2..40a80b3 100755 --- a/minimd.py +++ b/minimd.py @@ -1,58 +1,70 @@ #!/usr/bin/env python3 import re +import markdown2 # https://stackoverflow.com/a/6041965 -RE_URL = re.compile(r'(https?://([\w_-]+(?:(?:\.[\w_-]+)+))([\w.,@?^=%&:/~+#-]*[\w@?^=%&/~+#-]))') -RE_EM = re.compile(r'\*(.*?)\*') -RE_LIST = re.compile(r'(-|[0-9]\.) .*') +RE_URL = re.compile( + r"(https?://([\w_-]+(?:(?:\.[\w_-]+)+))([\w.,@?^=%&:/~+#-]*[\w@?^=%&/~+#-]))" +) +RE_EM = re.compile(r"\*(.*?)\*") +RE_LIST = re.compile(r"(-|[0-9]\.) .*") + +RE_PLAINURL = re.compile( + r"([ |\n])(https?://([\w_-]+(?:(?:\.[\w_-]+)+))([\w.,@?^=%&:/~+#-]*[\w@?^=%&/~+#-]))[^\)]" +) def html(text): + text = RE_PLAINURL.sub(r'\1[\2](\2)', text) + return markdown2.markdown(text) + + +def html_old(text): # Replace angle brackets to prevent XSS # Also replace ampersands to prevent surprises. - text = text.replace('&', '&').replace('<', '<').replace('>', '>') + text = text.replace("&", "&").replace("<", "<").replace(">", ">") - html = ['

'] - lines = text.split('\n') + html = ["

"] + lines = text.split("\n") in_code = False in_list = False for l in lines: - if l == '': + if l == "": in_list = False if in_code: - html.append('') + html.append("") in_code = False - html.append('

') + html.append("

") continue - if l.startswith(' '): + if l.startswith(" "): in_list = False l = l[2:] if not in_code: - html.append('

')
+                html.append("
")
                 in_code = True
             html.append(l)
             continue
         if in_code:
-            html.append('
') + html.append("
") in_code = False - l = RE_EM.sub(r'\1', l) + l = RE_EM.sub(r"\1", l) l = RE_URL.sub(r'\1', l) if RE_LIST.match(l): if in_list: - html.append('
') + html.append("
") in_list = True else: in_list = False html.append(l) if in_code: - html.append('') - html.append('

') - return '\n'.join(html) + html.append("") + html.append("

") + return "\n".join(html) -if __name__ == '__main__': +if __name__ == "__main__": import sys - print(html(sys.stdin.read())) + print(html_old(sys.stdin.read())) diff --git a/password.py b/password.py index 34583c0..8728485 100644 --- a/password.py +++ b/password.py @@ -1,9 +1,9 @@ import passlib.hash + def hash(password): return passlib.hash.argon2.hash(password) + def verify(password, hash): return passlib.hash.argon2.verify(password, hash) - - diff --git a/requirements.txt b/requirements.txt index b1b03cd..cabf836 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,4 @@ argon2-cffi==21.3.0 Flask==2.2.2 gunicorn==20.1.0 passlib==1.7.4 +markdown2==2.4.9 diff --git a/schema.txt b/schema.txt index 819618e..9c9fada 100644 --- a/schema.txt +++ b/schema.txt @@ -4,7 +4,8 @@ create table config ( description text not null, secret_key text not null, captcha_key text not null, - registration_enabled boolean not null + registration_enabled boolean not null, + login_required boolean not null ); create table users ( diff --git a/templates/admin/index.html b/templates/admin/index.html index e7c84f5..38c1627 100644 --- a/templates/admin/index.html +++ b/templates/admin/index.html @@ -21,6 +21,10 @@ Registration enabled + +Login required + + diff --git a/templates/comments.html b/templates/comments.html index 2e44ff4..a9675b5 100644 --- a/templates/comments.html +++ b/templates/comments.html @@ -2,7 +2,7 @@ {% from 'comment.html' import render_comment, render_comment_pre, render_comment_post, reply with context %} {% block content %} - +

« {{ forum_title }} « {{ title }}

{{ render_comment_pre(reply_comment, thread_id, comments | length == 0) }} {{ reply() }} diff --git a/templates/forum.html b/templates/forum.html index ac7c1f4..896ddc5 100644 --- a/templates/forum.html +++ b/templates/forum.html @@ -11,7 +11,7 @@ {% block content -%}

{{ minimd(description) | safe }}

-

Create thread

+

« Forum list | Create thread

{{- nav() -}} diff --git a/templates/thread.html b/templates/thread.html index 049a636..599a341 100644 --- a/templates/thread.html +++ b/templates/thread.html @@ -3,6 +3,7 @@ {%- from 'moderator.html' import moderate_thread with context %} {%- block content %} +

« {{ forum_title }}

{%- if user is not none and user.is_moderator() -%}

{{ moderate_thread(thread_id, hidden) }}

{%- endif -%} diff --git a/tool.py b/tool.py index 537ef86..592a716 100755 --- a/tool.py +++ b/tool.py @@ -2,24 +2,27 @@ import sys, password + def arg(i, s): if i < len(sys.argv): return sys.argv[i] print(s) sys.exit(1) + def arg_last(i, s): if i == len(sys.argv) - 1: return sys.argv[i] print(s) sys.exit(1) -proc = 'tool.py' if len(sys.argv) < 1 else sys.argv[0] -cmd = arg(1, f'usage: {proc} [...]') -if cmd == 'password': - pwd = arg_last(2, 'usage: {proc} password ') +proc = "tool.py" if len(sys.argv) < 1 else sys.argv[0] +cmd = arg(1, f"usage: {proc} [...]") + +if cmd == "password": + pwd = arg_last(2, "usage: {proc} password ") print(password.hash(pwd)) else: - print('unknown command ', cmd) + print("unknown command ", cmd) sys.exit(1)