Files
abot/utils.py
2018-12-04 20:58:24 +02:00

285 lines
7.5 KiB
Python

from datetime import datetime, timezone
from flask import current_app as app
from flask import g
import os
from werkzeug.utils import secure_filename
def create_result_table(key):
cur = g.db.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS `%s` (
question TEXT,
answer TEXT,
answer_type TEXT
);
"""%(key, )
)
g.db.commit()
def create_voter_table(db, name):
table_name = get_voter_table_name(name)
cur = db.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS `%s` (
token TEXT PRIMARY KEY,
answered BOOLEAN
);
"""%(table_name, )
)
db.commit()
def get_voter_table_name(key):
return key + "__voters"
def get_result_table_name(key):
return key
def get_token_counts(db, key):
cur = db.cursor()
token_table = get_voter_table_name(key)
cur.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name=?;",
( token_table, )
)
matching_tables = cur.fetchall()
if len(matching_tables) == 0:
used_tokens = 0
unused_tokens = 0
else:
cur.execute(
"SELECT count(*) FROM `%s` WHERE answered = 'true'"%(
token_table,
)
)
used_tokens = cur.fetchall()[0][0]
cur.execute(
"SELECT count(*) FROM `%s` WHERE answered = 'false'"%(
token_table,
)
)
unused_tokens = cur.fetchall()[0][0]
tokens = {
'unused': unused_tokens,
'used': used_tokens,
'total': used_tokens + unused_tokens
}
return tokens
def get_summary(db, key):
""" returns summary for a vote event """
questions = []
answers = {}
cur = db.cursor()
cur.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name=?;",
( key, )
)
matching_tables = cur.fetchall()
if len(matching_tables) == 0:
return questions, answers
cur.execute(
"SELECT question, answer, answer_type FROM `%s`"%(
key,
)
)
for row in cur:
if row[0] not in answers.keys():
questions.append(row[0])
answers[row[0]] = {
'answers': {},
'answer_type': row[2]
}
if row[1] not in answers[row[0]]['answers'].keys():
answers[row[0]]['answers'][row[1]] = 0
answers[row[0]]['answers'][row[1]] += 1
return questions, answers
def has_voted(key, token):
if token == None:
return True
cur = g.db.cursor()
cur.execute(
"SELECT token FROM %s WHERE token = ? AND answered = 'true'"%(
get_voter_table_name(key),
),
(
token,
)
)
return len(cur.fetchall()) > 0
def is_closed_vote(form):
return form['vote_style'] == 'closed'
def is_draft(form):
return form['draft']
def is_expired(form):
if form['expires'] == None:
return False
return datetime.now(timezone.utc) > form['expires']
def is_key(key, cli_opts = False):
key = secure_filename(key)
if cli_opts:
root_path = cli_opts.questions
else:
root_path = app.config['QUESTIONS']
return os.path.exists(
os.path.join(
root_path,
key + ".txt"
)
)
def is_show_results(form):
return form['show_results']
def parse_form(key):
form = {
'expires': None,
'draft': False,
'vote_style': "closed",
'show_results': False,
'questions': []
}
key = secure_filename(key)
try:
current_question = 0
with open(os.path.join(app.config['QUESTIONS'], key + ".txt"), "rt") as fp:
for row in fp:
if row.strip() == "":
continue
rowsl = row.lower().rstrip()
if row.startswith("#"):
continue
if rowsl.startswith("expires: "):
form['expires'] = parse_row_date(row)
continue
if rowsl.startswith("draft: "):
if rowsl == "draft: true":
form['draft'] = True
continue
if rowsl.startswith("vote_style: "):
if rowsl == "vote_style: open":
form['vote_style'] = "open"
continue
if rowsl.startswith("show_results: "):
if rowsl == "show_results: true":
form['show_results'] = True
continue
if row.startswith("- "):
if current_question == None:
continue
form['questions'][current_question]['choices'].append(
row[2:].strip()
)
continue
if row.startswith("+ "):
if current_question == None:
continue
form['questions'][current_question]['multichoices'].append(
row[2:].strip()
)
continue
current_question = len(form['questions'])
form['questions'].append({
'choices': [],
'multichoices': [],
'index': current_question + 1,
'name': row.strip().rstrip("_:").rstrip(),
'open_question': row.strip().endswith("___"),
'autoformat': not rowsl.startswith("<")
})
return form
except Exception as err:
if app.config['DEBUG']:
raise err
return False
def parse_row_date(row):
row = row[9:].strip()
if row.lower() == "none":
return None
try:
return datetime.strptime(
row,
'%Y-%m-%d %H:%M %z'
)
except Exception as err:
if app.config['DEBUG']:
print(row)
raise err
return None
def write_vote(key, token, answers, form):
cur = g.db.cursor()
for question in form['questions']:
answer = None
answer_type = None
if 'QC%d'%( question['index'], ) in answers:
answer = (answers['QC%d'%( question['index'], )],)
answer_type = "single"
if 'QM%d'%( question['index'], ) in answers:
answer = answers.getlist('QM%d'%( question['index'], ))
answer_type = "multiple"
if 'QO%d'%( question['index'], ) in answers:
answer = (answers['QO%d'%( question['index'], )],)
answer_type = "open"
if answer == None:
continue
if answer_type == None:
continue
for single in answer:
cur.execute(
"INSERT INTO `%s` VALUES (?, ?, ?)"%(
key,
),
(
question['name'],
single.strip(),
answer_type
)
)
if is_closed_vote(form):
cur.execute(
"UPDATE %s SET answered = 'true' WHERE token = ?"%(
get_voter_table_name(key),
),
(
token,
)
)
g.db.commit()
def time_to_expiry(form):
if form['expires'] == None:
return "Never"
time_to_go = form['expires'] - datetime.now(timezone.utc)
time_to_go = ".".join(str(time_to_go).split('.')[0:-1])[0:-3]
#time_to_go.microseconds = 0
return "%s (%s to go)"%( form['expires'], time_to_go )