333 lines
9.0 KiB
Python
333 lines
9.0 KiB
Python
from datetime import datetime, timezone
|
|
from flask import current_app as app
|
|
from flask import g
|
|
from werkzeug.utils import secure_filename
|
|
import hashlib
|
|
import html
|
|
import os
|
|
import sqlite3
|
|
|
|
|
|
def connect_db():
|
|
return sqlite3.connect(app.config['DATABASE'])
|
|
|
|
|
|
def create_db(db_file):
|
|
db = sqlite3.connect(db_file)
|
|
cur = db.cursor()
|
|
|
|
cur.execute("""
|
|
CREATE TABLE IF NOT EXISTS answers (
|
|
question_set TEXT,
|
|
question TEXT,
|
|
answer TEXT,
|
|
answer_type TEXT
|
|
);
|
|
""")
|
|
cur.execute("""
|
|
CREATE TABLE IF NOT EXISTS tokens (
|
|
token TEXT PRIMARY KEY,
|
|
question_set TEXT,
|
|
answered BOOLEAN,
|
|
role TEXT
|
|
);
|
|
"""
|
|
)
|
|
db.commit()
|
|
|
|
|
|
def get_hash(s):
|
|
return hashlib.sha224(s.encode('utf-8')).hexdigest()
|
|
|
|
|
|
def get_token_counts(db, key):
|
|
cur = db.cursor()
|
|
|
|
cur.execute(
|
|
"SELECT count(*) FROM tokens WHERE answered = 'true' AND question_set = ? AND role = 'voter'",
|
|
(
|
|
key,
|
|
)
|
|
)
|
|
used_tokens = cur.fetchall()[0][0]
|
|
cur.execute(
|
|
"SELECT count(*) FROM tokens WHERE answered = 'false' AND question_set = ? AND role = 'voter'",
|
|
(
|
|
key,
|
|
)
|
|
)
|
|
unused_tokens = cur.fetchall()[0][0]
|
|
tokens = {
|
|
'unused': unused_tokens,
|
|
'used': used_tokens,
|
|
'total': used_tokens + unused_tokens
|
|
}
|
|
return tokens
|
|
|
|
|
|
def get_summary(db, key):
|
|
""" returns summary for a vote event """
|
|
questions = []
|
|
answers = {}
|
|
cur = db.cursor()
|
|
|
|
cur.execute(
|
|
"SELECT question, answer, answer_type FROM answers WHERE question_set = ?",
|
|
(
|
|
key,
|
|
)
|
|
)
|
|
for row in cur:
|
|
if row[0] not in answers.keys():
|
|
questions.append(row[0])
|
|
answers[row[0]] = {
|
|
'answers': {},
|
|
'answer_type': row[2]
|
|
}
|
|
|
|
if row[1] not in answers[row[0]]['answers'].keys():
|
|
answers[row[0]]['answers'][row[1]] = 0
|
|
answers[row[0]]['answers'][row[1]] += 1
|
|
|
|
return questions, answers
|
|
|
|
|
|
def has_voted(key, token):
|
|
if token == None:
|
|
return True
|
|
cur = g.db.cursor()
|
|
cur.execute(
|
|
"SELECT token FROM tokens WHERE token = ? AND answered = 'true' AND question_set = ? AND role = 'voter'",
|
|
(
|
|
get_hash(token),
|
|
key
|
|
)
|
|
)
|
|
return len(cur.fetchall()) > 0
|
|
|
|
|
|
def is_observer(key, token):
|
|
if token == None:
|
|
return False
|
|
cur = g.db.cursor()
|
|
cur.execute(
|
|
"SELECT token FROM tokens WHERE token = ? AND question_set = ? AND role = 'observer'",
|
|
(
|
|
get_hash(token),
|
|
key
|
|
)
|
|
)
|
|
return len(cur.fetchall()) > 0
|
|
|
|
|
|
def is_closed_vote(form):
|
|
return form['vote_style'] == 'closed'
|
|
|
|
|
|
def is_draft(form):
|
|
return form['draft']
|
|
|
|
|
|
def is_expired(form):
|
|
if form['expires'] == None:
|
|
return False
|
|
return datetime.now(timezone.utc) > form['expires']
|
|
|
|
|
|
def is_opened(form):
|
|
if form['opens'] == None:
|
|
return True
|
|
return datetime.now(timezone.utc) > form['opens']
|
|
|
|
|
|
def is_key(key, cli_opts = False):
|
|
key = secure_filename(key)
|
|
|
|
if cli_opts:
|
|
root_path = cli_opts.questions
|
|
else:
|
|
root_path = app.config['QUESTIONS']
|
|
|
|
return os.path.exists(
|
|
os.path.join(
|
|
root_path,
|
|
key + ".txt"
|
|
)
|
|
)
|
|
|
|
|
|
def is_show_results(form):
|
|
return form['show_results']
|
|
|
|
|
|
def is_voter(key, token):
|
|
if token == None:
|
|
return False
|
|
cur = g.db.cursor()
|
|
cur.execute(
|
|
"SELECT token FROM tokens WHERE token = ? AND question_set = ? AND role = 'voter'",
|
|
(
|
|
get_hash(token),
|
|
key
|
|
)
|
|
)
|
|
return len(cur.fetchall()) > 0
|
|
|
|
def parse_form(key):
|
|
form = {
|
|
'expires': None,
|
|
'opens': None,
|
|
'draft': False,
|
|
'vote_style': "closed",
|
|
'show_results': False,
|
|
'questions': [],
|
|
'title': "",
|
|
'can_submit': True,
|
|
'message': ''
|
|
}
|
|
key = secure_filename(key)
|
|
try:
|
|
current_question = 0
|
|
with open(os.path.join(app.config['QUESTIONS'], key + ".txt"), "rt", encoding = "UTF-8") as fp:
|
|
for row in fp:
|
|
if row.strip() == "":
|
|
continue
|
|
rowsl = row.lower().rstrip()
|
|
if row.startswith("#"):
|
|
continue
|
|
if rowsl.startswith("title: "):
|
|
form['title'] = row[6:].strip()
|
|
g.title = row[6:].strip()
|
|
continue
|
|
if rowsl.startswith("expires: "):
|
|
form['expires'] = parse_row_date(row)
|
|
continue
|
|
if rowsl.startswith("opens: "):
|
|
form['opens'] = parse_row_date(row)
|
|
continue
|
|
if rowsl.startswith("draft: "):
|
|
if rowsl == "draft: true":
|
|
form['draft'] = True
|
|
continue
|
|
if rowsl.startswith("vote_style: "):
|
|
if rowsl == "vote_style: open":
|
|
form['vote_style'] = "open"
|
|
continue
|
|
if rowsl.startswith("show_results: "):
|
|
if rowsl == "show_results: true":
|
|
form['show_results'] = True
|
|
continue
|
|
if row.startswith("- "):
|
|
if current_question == None:
|
|
continue
|
|
form['questions'][current_question]['choices'].append(
|
|
row[2:].strip()
|
|
)
|
|
continue
|
|
if row.startswith("+ "):
|
|
if current_question == None:
|
|
continue
|
|
form['questions'][current_question]['multichoices'].append(
|
|
row[2:].strip()
|
|
)
|
|
continue
|
|
current_question = len(form['questions'])
|
|
form['questions'].append({
|
|
'choices': [],
|
|
'multichoices': [],
|
|
'index': current_question + 1,
|
|
'name': row.strip().rstrip("_").rstrip(),
|
|
'open_question': row.strip().endswith("___"),
|
|
'autoformat': not rowsl.startswith("<")
|
|
})
|
|
return form
|
|
except Exception as err:
|
|
if app.config['DEBUG']:
|
|
raise err
|
|
return False
|
|
|
|
|
|
def parse_row_date(row):
|
|
row = " ".join(row.split(" ")[1:]).strip()
|
|
if row.lower() == "none":
|
|
return None
|
|
try:
|
|
return datetime.strptime(
|
|
row,
|
|
'%Y-%m-%d %H:%M %z'
|
|
)
|
|
except Exception as err:
|
|
if app.config['DEBUG']:
|
|
print(row)
|
|
raise err
|
|
return None
|
|
|
|
|
|
def write_vote(key, token, answers, form):
|
|
|
|
cur = g.db.cursor()
|
|
for question in form['questions']:
|
|
answer = None
|
|
answer_type = None
|
|
if 'QC%d'%( question['index'], ) in answers:
|
|
answer = (answers['QC%d'%( question['index'], )],)
|
|
answer_type = "single"
|
|
if 'QM%d'%( question['index'], ) in answers:
|
|
answer = answers.getlist('QM%d'%( question['index'], ))
|
|
answer_type = "multiple"
|
|
if 'QO%d'%( question['index'], ) in answers:
|
|
answer = (answers['QO%d'%( question['index'], )],)
|
|
answer_type = "open"
|
|
if answer == None:
|
|
continue
|
|
if answer_type == None:
|
|
continue
|
|
for single in answer:
|
|
cur.execute(
|
|
"INSERT INTO answers (question_set, question, answer, answer_type) VALUES (?, ?, ?, ?)",
|
|
(
|
|
key,
|
|
question['name'],
|
|
html.escape(single).strip(),
|
|
answer_type
|
|
)
|
|
)
|
|
if is_closed_vote(form):
|
|
cur.execute(
|
|
"UPDATE tokens SET answered = 'true' WHERE token = ? AND question_set = ?",
|
|
(
|
|
get_hash(token),
|
|
key
|
|
)
|
|
)
|
|
g.db.commit()
|
|
|
|
|
|
def sort_summary(questions, answers):
|
|
sorted_answer_list = []
|
|
for q,a in zip(questions, answers):
|
|
sum_answers = sum([answers[q]['answers'][x] for x in answers[q]['answers']])
|
|
sorted_answers = sorted(
|
|
[{"answer_type": answers[q]['answer_type'] , "answer": x, "count": answers[q]['answers'][x], "percent": int(100 * float(answers[q]['answers'][x]) / sum_answers)} for x in answers[q]['answers']],
|
|
key = lambda i: -i['count']
|
|
)
|
|
sorted_answer_list.append(sorted_answers)
|
|
return questions, sorted_answer_list
|
|
|
|
|
|
def time_to(what, form):
|
|
if not what in ('expires','opens'):
|
|
raise AttributeError("Dont know that attribute")
|
|
|
|
if form[what] == None:
|
|
if what == 'expires':
|
|
return "Never"
|
|
if what == 'opens':
|
|
return "None"
|
|
|
|
time_to_go = form[what] - datetime.now(timezone.utc)
|
|
time_to_go = ".".join(str(time_to_go).split('.')[0:-1])[0:-3]
|
|
|
|
#time_to_go.microseconds = 0
|
|
return "%s (%s to go)"%( form[what], time_to_go )
|