Files
qgreper/main.py
2022-10-26 18:25:15 +02:00

836 lines
26 KiB
Python

VERSION = 'agreper-v0.1.1'
# TODO put in config table
THREADS_PER_PAGE = 50
from flask import Flask, render_template, session, request, redirect, url_for, flash, g
from db.sqlite import DB
import os, sys, subprocess
import passlib.hash, secrets
import time
import string
from datetime import datetime
import captcha, password, minimd
app = Flask(__name__)
db = DB(os.getenv('DB'))
class Config:
pass
config = Config()
config.version, config.server_name, config.server_description, app.config['SECRET_KEY'], config.captcha_key, config.registration_enabled = db.get_config()
if config.version != VERSION:
print(f'Incompatible version {config.version} (expected {VERSION})')
sys.exit(1)
class Role:
USER = 0
MODERATOR = 1
ADMIN = 2
@app.route('/')
def index():
return render_template(
'index.html',
title = config.server_name,
description = config.server_description,
config = config,
user = get_user(),
forums = db.get_forums()
)
@app.route('/forum/<int:forum_id>/')
def forum(forum_id):
title, description = db.get_forum(forum_id)
offset = int(request.args.get('p', 0))
user_id = session.get('user_id', -1)
threads = [*db.get_threads(forum_id, offset, THREADS_PER_PAGE + 1, user_id)]
if len(threads) == THREADS_PER_PAGE + 1:
threads.pop()
next_page = offset + THREADS_PER_PAGE
else:
next_page = None
return render_template(
'forum.html',
title = title,
user = get_user(),
config = config,
forum_id = forum_id,
description = description,
threads = threads,
next_page = next_page,
prev_page = max(offset - THREADS_PER_PAGE, 0) if offset > 0 else None,
)
@app.route('/thread/<int:thread_id>/')
def thread(thread_id):
user = get_user()
title, text, author, author_id, create_time, modify_time, comments, hidden = db.get_thread(thread_id)
comments = create_comment_tree(comments, user)
return render_template(
'thread.html',
title = title,
config = config,
user = user,
text = text,
author = author,
author_id = author_id,
thread_id = thread_id,
hidden = hidden,
create_time = create_time,
modify_time = modify_time,
comments = comments,
)
@app.route('/comment/<int:comment_id>/')
def comment(comment_id):
user = get_user()
thread_id, parent_id, title, comments = db.get_subcomments(comment_id)
comments = create_comment_tree(comments, user)
reply_comment, = comments
comments = reply_comment.children
reply_comment.children = []
return render_template(
'comments.html',
title = title,
config = config,
user = user,
reply_comment = reply_comment,
comments = comments,
parent_id = parent_id,
thread_id = thread_id,
)
@app.route('/login/', methods = ['GET', 'POST'])
def login():
if request.method == 'POST':
v = db.get_user_password(request.form['username'])
if v is not None:
id, hash = v
if password.verify(request.form['password'], hash):
flash('Logged in', 'success')
session['user_id'] = id
session.permanent = True
return redirect(url_for('index'))
else:
# Sleep to reduce effectiveness of bruteforce
time.sleep(0.1)
flash('Username or password is invalid', 'error')
return render_template(
'login.html',
title = 'Login',
config = config,
user = get_user()
)
@app.route('/logout/')
def logout():
session.pop('user_id')
return redirect(url_for('index'))
@app.route('/user/', methods = ['GET', 'POST'])
def user_edit():
user = get_user()
if user is None:
return redirect(url_for('login'))
if request.method == 'POST':
about = trim_text(request.form['about'])
db.set_user_private_info(user.id, about)
flash('Updated profile', 'success')
else:
about, = db.get_user_private_info(user.id)
return render_template(
'user_edit.html',
title = 'Edit profile',
config = config,
user = user,
about = about
)
@app.route('/user/edit/password/', methods = ['POST'])
def user_edit_password():
user_id = session.get('user_id')
if user_id is None:
return redirect(url_for('login'))
new = request.form['new']
if len(new) < 8:
flash('New password must be at least 8 characters long', 'error')
else:
hash, = db.get_user_password_by_id(user_id)
if password.verify(request.form['old'], hash):
if db.set_user_password(user_id, password.hash(new)):
flash('Updated password', 'success')
else:
flash('Failed to update password', 'error')
else:
flash('Old password does not match', 'error')
return redirect(url_for('user_edit'))
@app.route('/user/<int:user_id>/')
def user_info(user_id):
name, about, banned_until = db.get_user_public_info(user_id)
return render_template(
'user_info.html',
title = 'Profile',
config = config,
user = get_user(),
name = name,
id = user_id,
banned_until = banned_until,
about = about
)
@app.route('/forum/<int:forum_id>/new/', methods = ['GET', 'POST'])
def new_thread(forum_id):
user_id = session.get('user_id')
if user_id is None and not config.registration_enabled:
# Can't create a thread without an account
return redirect(url_for('login'))
if request.method == 'POST':
if user_id is None:
# Attempt to create a user account first
if register_user(True):
user_id = session['user_id']
if user_id is not None:
title, text = request.form['title'].strip(), trim_text(request.form['text'])
title = title.strip()
if title == '' or text == '':
flash('Title and text may not be empty', 'error')
return redirect(url_for('forum', forum_id = forum_id))
id = db.add_thread(user_id, forum_id, title, text, time.time_ns())
if id is None:
flash('Failed to create thread', 'error')
return redirect(url_for('forum', forum_id = forum_id))
else:
id, = id
flash('Created thread', 'success')
return redirect(url_for('thread', thread_id = id))
return render_template(
'new_thread.html',
title = 'Create new thread',
config = config,
user = get_user(),
)
@app.route('/thread/<int:thread_id>/confirm_delete/')
def confirm_delete_thread(thread_id):
title, = db.get_thread_title(thread_id)
return render_template(
'confirm_delete_thread.html',
title = 'Delete thread',
config = config,
user = get_user(),
thread_title = title,
)
@app.route('/thread/<int:thread_id>/delete/', methods = ['POST'])
def delete_thread(thread_id):
user_id = session.get('user_id')
if user_id is None:
return redirect(url_for('login'))
if db.delete_thread(user_id, thread_id):
flash('Thread has been deleted', 'success')
else:
flash('Thread could not be removed', 'error')
# TODO return 403, maybe?
return redirect(url_for('index'))
def _add_comment_check_user():
user_id = session.get('user_id')
if user_id is not None:
return user_id
if not config.registration_enabled:
flash('Registrations are not enabled. Please log in to comment', 'error')
if register_user(True):
return session['user_id']
@app.route('/thread/<int:thread_id>/comment/', methods = ['POST'])
def add_comment(thread_id):
user_id = _add_comment_check_user()
if user_id is not None:
text = trim_text(request.form['text'])
if text == '':
flash('Text may not be empty', 'error')
elif db.add_comment_to_thread(thread_id, user_id, text, time.time_ns()):
flash('Added comment', 'success')
else:
flash('Failed to add comment', 'error')
return redirect(url_for('thread', thread_id = thread_id))
@app.route('/comment/<int:comment_id>/comment/', methods = ['POST'])
def add_comment_parent(comment_id):
user_id = _add_comment_check_user()
if user_id is not None:
text = trim_text(request.form['text'])
if text == '':
flash('Text may not be empty', 'error')
elif db.add_comment_to_comment(comment_id, user_id, text, time.time_ns()):
flash('Added comment', 'success')
else:
flash('Failed to add comment', 'error')
return redirect(url_for('comment', comment_id = comment_id))
@app.route('/comment/<int:comment_id>/confirm_delete/')
def confirm_delete_comment(comment_id):
title, text = db.get_comment(comment_id)
return render_template(
'confirm_delete_comment.html',
title = 'Delete comment',
config = config,
user = get_user(),
thread_title = title,
text = text,
)
@app.route('/comment/<int:comment_id>/delete/', methods = ['POST'])
def delete_comment(comment_id):
user_id = session.get('user_id')
if user_id is None:
return redirect(url_for('login'))
if db.delete_comment(user_id, comment_id):
flash('Comment has been deleted', 'success')
else:
flash('Comment could not be removed', 'error')
# TODO return 403, maybe?
return redirect(url_for('index'))
@app.route('/thread/<int:thread_id>/edit/', methods = ['GET', 'POST'])
def edit_thread(thread_id):
user_id = session.get('user_id')
if user_id is None:
return redirect(url_for('login'))
if request.method == 'POST':
title, text = request.form['title'].strip(), trim_text(request.form['text'])
if title == '' or text == '':
flash('Title and text may not be empty', 'error')
elif db.modify_thread(
thread_id,
user_id,
title,
text,
time.time_ns(),
):
flash('Thread has been edited', 'success')
else:
flash('Thread could not be edited', 'error')
return redirect(url_for('thread', thread_id = thread_id))
title, text = db.get_thread_title_text(thread_id)
return render_template(
'edit_thread.html',
title = 'Edit thread',
config = config,
user = get_user(),
thread_title = title,
text = text,
)
@app.route('/comment/<int:comment_id>/edit/', methods = ['GET', 'POST'])
def edit_comment(comment_id):
user_id = session.get('user_id')
if user_id is None:
return redirect(url_for('login'))
if request.method == 'POST':
text = trim_text(request.form['text'])
if text == '':
flash('Text may not be empty', 'error')
elif db.modify_comment(
comment_id,
user_id,
trim_text(request.form['text']),
time.time_ns(),
):
flash('Comment has been edited', 'success')
else:
flash('Comment could not be edited', 'error')
return redirect(url_for('comment', comment_id = comment_id))
title, text = db.get_comment(comment_id)
return render_template(
'edit_comment.html',
title = 'Edit comment',
config = config,
user = get_user(),
thread_title = title,
text = text,
)
@app.route('/register/', methods = ['GET', 'POST'])
def register():
if request.method == 'POST':
username, passwd = request.form['username'], request.form['password']
if register_user(False):
return redirect(url_for('index'))
capt, answer = captcha.generate(config.captcha_key)
return render_template(
'register.html',
title = 'Register',
config = config,
user = get_user(),
captcha = capt,
answer = answer,
)
@app.route('/admin/')
def admin():
chk, user = _admin_check()
if not chk:
return user
return render_template(
'admin/index.html',
title = 'Admin panel',
config = config,
forums = db.get_forums(),
users = db.get_users(),
)
@app.route('/admin/query/', methods = ['GET', 'POST'])
def admin_query():
chk, user = _admin_check()
if not chk:
return user
try:
rows, rowcount = db.query(request.form['q']) if request.method == 'POST' else []
if rowcount > 0:
flash(f'{rowcount} rows changed', 'success')
except Exception as e:
flash(e, 'error')
rows = []
return render_template(
'admin/query.html',
title = 'Query',
config = config,
rows = rows,
)
@app.route('/admin/forum/<int:forum_id>/edit/<string:what>/', methods = ['POST'])
def admin_edit_forum(forum_id, what):
chk, user = _admin_check()
if not chk:
return user
try:
if what == 'description':
res = db.set_forum_description(forum_id, trim_text(request.form['description']))
elif what == 'name':
res = db.set_forum_name(forum_id, request.form['name'])
else:
flash(f'Unknown property "{what}"', 'error')
res = None
if res is True:
flash(f'Updated {what}', 'success')
elif res is False:
flash(f'Failed to update {what}', 'error')
except Exception as e:
flash(e, 'error')
return redirect(url_for('admin'))
@app.route('/admin/forum/new/', methods = ['POST'])
def admin_new_forum():
chk, user = _admin_check()
if not chk:
return user
try:
db.add_forum(request.form['name'], trim_text(request.form['description']))
flash('Added forum', 'success')
except Exception as e:
flash(str(e), 'error')
return redirect(url_for('admin'))
@app.route('/admin/config/edit/', methods = ['POST'])
def admin_edit_config():
chk, user = _admin_check()
if not chk:
return user
try:
db.set_config(
request.form['server_name'],
trim_text(request.form['server_description']),
'registration_enabled' in request.form,
)
flash('Updated config. Refresh the page to see the changes.', 'success')
restart()
except Exception as e:
flash(str(e), 'error')
return redirect(url_for('admin'))
@app.route('/admin/config/new_secrets/', methods = ['POST'])
def admin_new_secrets():
chk, user = _admin_check()
if not chk:
return user
secret_key = secrets.token_urlsafe(30)
captcha_key = secrets.token_urlsafe(30)
try:
db.set_config_secrets(secret_key, captcha_key)
flash('Changed secrets. You will be logged out.', 'success')
restart()
except Exception as e:
flash(str(e), 'error')
return redirect(url_for('admin'))
def ban_user(user_id):
chk, user = _moderator_check()
if not chk:
return user
d, t = request.form['days'], request.form['time']
d = 0 if d == '' else int(d)
h, m = (0, 0) if t == '' else map(int, t.split(':'))
until = time.time_ns() + (d * 24 * 60 + h * 60 + m) * (60 * 10**9)
until = min(until, 0x7fff_ffff_ffff_ffff)
try:
if db.set_user_ban(user_id, until):
flash('Banned user', 'success')
else:
flash('Failed to ban user', 'error')
except Exception as e:
flash(str(e), 'error')
@app.route('/user/<int:user_id>/ban/', methods = ['POST'])
def moderator_ban_user(user_id):
return ban_user(user_id) or redirect(url_for('user_info', user_id = user_id))
@app.route('/admin/user/<int:user_id>/ban/', methods = ['POST'])
def admin_ban_user(user_id):
return ban_user(user_id) or redirect(url_for('admin'))
def unban_user(user_id):
chk, user = _moderator_check()
if not chk:
return user
try:
if db.set_user_ban(user_id, 0):
flash('Unbanned user', 'success')
else:
flash('Failed to unban user', 'error')
except Exception as e:
flash(str(e), 'error')
@app.route('/user/<int:user_id>/unban/', methods = ['POST'])
def moderator_unban_user(user_id):
return unban_user(user_id) or redirect(url_for('user_info', user_id = user_id))
@app.route('/admin/user/<int:user_id>/unban/', methods = ['POST'])
def admin_unban_user(user_id):
return unban_user(user_id) or redirect(url_for('admin'))
@app.route('/admin/user/new/', methods = ['POST'])
def admin_new_user():
chk, user = _admin_check()
if not chk:
return user
try:
name, passwd = request.form['name'], request.form['password']
if name == '' or passwd == '':
flash('Name and password may not be empty')
elif db.add_user(name, password.hash(passwd), time.time_ns()):
flash('Added user', 'success')
else:
flash('Failed to add user', 'error')
except Exception as e:
flash(str(e), 'error')
return redirect(url_for('admin'))
@app.route('/admin/user/<int:user_id>/edit/role/', methods = ['POST'])
def admin_set_role(user_id):
chk, user = _admin_check()
if not chk:
return user
try:
role = request.form['role']
if role not in ('0', '1', '2'):
flash(f'Invalid role type ({role})', 'error')
else:
db.set_user_role(user_id, role)
flash('Set user role', 'success')
except Exception as e:
flash(str(e), 'error')
return redirect(url_for('admin'))
@app.route('/admin/restart/', methods = ['POST'])
def admin_restart():
chk, user = _admin_check()
if not chk:
return user
restart()
return redirect(url_for('admin'))
@app.route('/thread/<int:thread_id>/hide/', methods = ['POST'])
def set_hide_thread(thread_id):
chk, user = _moderator_check()
if not chk:
return user
try:
hide = request.form['hide'] != '0'
hide_str = 'Hidden' if hide else 'Unhidden'
if db.set_thread_hidden(thread_id, hide):
flash(f'{hide_str} thread', 'success')
else:
flash(f'Failed to {hide_str.lower()} thread', 'error')
except Exception as e:
flash(str(e), 'error')
return redirect(request.form['redirect'])
@app.route('/comment/<int:comment_id>/hide/', methods = ['POST'])
def set_hide_comment(comment_id):
chk, user = _moderator_check()
if not chk:
return user
try:
hide = request.form['hide'] != '0'
hide_str = 'Hidden' if hide else 'Unhidden'
if db.set_comment_hidden(comment_id, hide):
flash(f'{hide_str} comment', 'success')
else:
flash(f'Failed to {hide_str.lower()} comment', 'error')
except Exception as e:
flash(str(e), 'error')
return redirect(request.form['redirect'])
# TODO can probably be a static-esque page, maybe?
@app.route('/help/')
def help():
return render_template(
'help.html',
title = 'Help',
user = get_user(),
)
def _moderator_check():
user = get_user()
if user is None:
return False, redirect(url_for('login'))
if not user.is_moderator():
return False, ('<h1>Forbidden</h1>', 403)
return True, user
def _admin_check():
user = get_user()
if user is None:
return False, redirect(url_for('login'))
if not user.is_admin():
return False, ('<h1>Forbidden</h1>', 403)
return True, user
class Comment:
def __init__(self, id, parent_id, author_id, author, text, create_time, modify_time, hidden):
self.id = id
self.author_id = author_id
self.author = author
self.text = text
self.children = []
self.create_time = create_time
self.modify_time = modify_time
self.parent_id = parent_id
self.hidden = hidden
def create_comment_tree(comments, user):
start = time.time();
# Collect comments first, then build the tree in case we encounter a child before a parent
comment_map = { v[0]: Comment(*v) for v in comments }
root = []
# We should keep showing hidden comments if the user replied to them, directly or indirectly.
# To do that, keep track of user comments, then walk up the tree and insert hidden comments.
user_comments = []
# Build tree
def insert(comment):
parent = comment_map.get(comment.parent_id)
if parent is not None:
parent.children.append(comment)
else:
root.append(comment)
for comment in comment_map.values():
if comment.hidden and (not user or not user.is_moderator()):
continue
insert(comment)
if user and (comment.author_id == user.id and not user.is_moderator()):
user_comments.append(comment)
# Insert replied-to hidden comments
for c in user_comments:
while c is not None:
if c.hidden:
insert(c)
c = comment_map.get(c.parent_id)
# Sort each comment based on create time
def sort_time(l):
l.sort(key=lambda c: c.modify_time, reverse=True)
for c in l:
sort_time(c.children)
sort_time(root)
return root
class User:
def __init__(self, id, name, role, banned_until):
self.id = id
self.name = name
self.role = role
self.banned_until = banned_until
def is_moderator(self):
return self.role in (Role.ADMIN, Role.MODERATOR)
def is_admin(self):
return self.role == Role.ADMIN
def is_banned(self):
return self.banned_until > time.time_ns()
def get_user():
id = session.get('user_id')
if id is not None:
name, role, banned_until = db.get_user_name_role_banned(id)
return User(id, name, role, banned_until)
return None
def register_user(show_password):
username, passwd = request.form['username'], request.form['password']
if any(c in username for c in string.whitespace):
# This error is more ergonomic in case someone tries to play tricks again :)
flash('Username may not contain whitespace', 'error')
elif len(username) < 3:
flash('Username must be at least 3 characters long', 'error')
elif len(passwd) < 8:
flash('Password must be at least 8 characters long', 'error')
elif not captcha.verify(
config.captcha_key,
request.form['captcha'],
request.form['answer'],
):
flash('CAPTCHA answer is incorrect', 'error')
else:
uid = db.register_user(username, password.hash(passwd), time.time_ns())
if uid is None:
flash('Failed to create account (username may already be taken)', 'error')
else:
s = 'Account has been created.'
if show_password:
s += f' Your password is <code class=spoiler>{passwd}</code> (hover to reveal).'
flash(s, 'success')
uid, = uid
session['user_id'] = uid
session.permanent = True
return True
return False
@app.context_processor
def utility_processor():
def _format_time_delta(n, t):
# Try the sane thing first
dt = (n - t) // 10 ** 9
if dt < 1:
return "less than a second"
if dt < 2:
return f"1 second"
if dt < 60:
return f"{dt} seconds"
if dt < 119:
return f"1 minute"
if dt < 3600:
return f"{dt // 60} minutes"
if dt < 3600 * 2:
return f"1 hour"
if dt < 3600 * 24:
return f"{dt // 3600} hours"
if dt < 3600 * 24 * 31:
return f"{dt // (3600 * 24)} days"
# Try some very rough estimate, whatever
f = lambda x: datetime.utcfromtimestamp(x // 10 ** 9)
n, t = f(n), f(t)
def f(x, y, s):
return f'{y - x} {s}{"s" if y - x > 1 else ""}'
if t.year < n.year:
return f(t.year, n.year, "year")
if t.month < n.month:
return f(t.month, n.month, "month")
assert False, 'unreachable'
def format_since(t):
n = time.time_ns()
if n < t:
return 'in a distant future'
return _format_time_delta(n, t) + ' ago'
def format_until(t):
n = time.time_ns()
if t <= n:
return 'in a distant past'
return _format_time_delta(t, n)
def format_time(t):
return datetime.utcfromtimestamp(t / 10 ** 9).replace(microsecond=0)
def rand_password():
'''
Generate a random password.
The current implementation returns 12 random lower- and uppercase alphabet characters.
This gives up to `log((26 * 2) ** 12) / log(2) = ~68` bits of entropy, which should be
enough for the foreseeable future.
'''
return ''.join(string.ascii_letters[secrets.randbelow(52)] for _ in range(12))
def gen_captcha():
return captcha.generate(config.captcha_key)
return {
'format_since': format_since,
'format_time': format_time,
'format_until': format_until,
'minimd': minimd.html,
'rand_password': rand_password,
'gen_captcha': gen_captcha,
}
def restart():
'''
Shut down *all* workers and spawn new ones.
This is necessary on e.g. a configuration change.
Since restarting workers depends is platform-dependent this task is delegated to an external
program.
'''
r = subprocess.call(['./restart.sh'])
if r == 0:
flash('Restart script exited successfully', 'success')
else:
flash(f'Restart script exited with error (code {r})', 'error')
def trim_text(s):
'''
Because browsers LOVE \\r, trailing whitespace etc.
'''
return s.replace('\r', '')