initial working state
This commit is contained in:
191
utils.py
Normal file
191
utils.py
Normal file
@@ -0,0 +1,191 @@
|
||||
from datetime import datetime, timezone
|
||||
from flask import current_app as app
|
||||
from flask import g
|
||||
import os
|
||||
from werkzeug.utils import secure_filename
|
||||
|
||||
def create_result_table(key):
|
||||
cur = g.db.cursor()
|
||||
cur.execute("""
|
||||
CREATE TABLE IF NOT EXISTS `%s` (
|
||||
question TEXT,
|
||||
answer TEXT,
|
||||
answer_type TEXT
|
||||
);
|
||||
"""%(key, )
|
||||
)
|
||||
g.db.commit()
|
||||
|
||||
def create_voter_table(db, name):
|
||||
table_name = get_voter_table_name(name)
|
||||
cur = db.cursor()
|
||||
|
||||
cur.execute("""
|
||||
CREATE TABLE IF NOT EXISTS `%s` (
|
||||
token TEXT PRIMARY KEY,
|
||||
answered BOOLEAN
|
||||
);
|
||||
"""%(table_name, )
|
||||
)
|
||||
db.commit()
|
||||
|
||||
def get_voter_table_name(key):
|
||||
return key + "__voters"
|
||||
|
||||
|
||||
def get_result_table_name(key):
|
||||
return key
|
||||
|
||||
|
||||
def has_voted(key, token):
|
||||
cur = g.db.cursor()
|
||||
cur.execute(
|
||||
"SELECT token FROM %s WHERE token = ? AND answered = 'true'"%(
|
||||
get_voter_table_name(key),
|
||||
),
|
||||
(
|
||||
token,
|
||||
)
|
||||
)
|
||||
return len(cur.fetchall()) > 0
|
||||
|
||||
|
||||
def is_draft(form):
|
||||
return form['draft']
|
||||
|
||||
def is_expired(form):
|
||||
if form['expires'] == None:
|
||||
return False
|
||||
|
||||
return datetime.now(timezone.utc) > form['expires']
|
||||
|
||||
def is_key(key, cli_opts = False):
|
||||
key = secure_filename(key)
|
||||
|
||||
if cli_opts:
|
||||
root_path = cli_opts.questions
|
||||
else:
|
||||
root_path = app.config['QUESTIONS']
|
||||
|
||||
return os.path.exists(
|
||||
os.path.join(
|
||||
root_path,
|
||||
key + ".txt"
|
||||
)
|
||||
)
|
||||
|
||||
def parse_form(key):
|
||||
form = {
|
||||
'expires': None,
|
||||
'draft': False,
|
||||
'questions': []
|
||||
}
|
||||
key = secure_filename(key)
|
||||
try:
|
||||
current_question = None
|
||||
with open(os.path.join(app.config['QUESTIONS'], key + ".txt"), "rt") as fp:
|
||||
for row in fp:
|
||||
if row.strip() == "":
|
||||
continue
|
||||
if row.startswith("#"):
|
||||
continue
|
||||
if row.lower().startswith("expires: "):
|
||||
form['expires'] = parse_row_date(row)
|
||||
continue
|
||||
if row.lower().startswith("draft: "):
|
||||
if row.lower().rstrip() == "draft: true":
|
||||
form['draft'] = True
|
||||
continue
|
||||
if row.startswith("- "):
|
||||
if current_question == None:
|
||||
continue
|
||||
form['questions'][current_question]['choices'].append(
|
||||
row[2:].strip()
|
||||
)
|
||||
continue
|
||||
if row.startswith("+ "):
|
||||
if current_question == None:
|
||||
continue
|
||||
form['questions'][current_question]['multichoices'].append(
|
||||
row[2:].strip()
|
||||
)
|
||||
continue
|
||||
current_question = len(form['questions'])
|
||||
form['questions'].append({
|
||||
'choices': [],
|
||||
'multichoices': [],
|
||||
'index': len(form['questions']) + 1,
|
||||
'name': row.strip().rstrip("_:").rstrip(),
|
||||
'open_question': row.strip().endswith("___")
|
||||
})
|
||||
return form
|
||||
except Exception as err:
|
||||
if app.config['DEBUG']:
|
||||
raise err
|
||||
return False
|
||||
|
||||
|
||||
def parse_row_date(row):
|
||||
row = row[9:].strip()
|
||||
if row.lower() == "none":
|
||||
return None
|
||||
try:
|
||||
return datetime.strptime(
|
||||
row,
|
||||
'%Y-%m-%d %H:%M %z'
|
||||
)
|
||||
except Exception as err:
|
||||
if app.config['DEBUG']:
|
||||
print(row)
|
||||
raise err
|
||||
return None
|
||||
|
||||
def write_vote(key, token, answers, form):
|
||||
|
||||
cur = g.db.cursor()
|
||||
for question in form['questions']:
|
||||
answer = None
|
||||
answer_type = None
|
||||
if 'QC%d'%( question['index'], ) in answers:
|
||||
answer = (answers['QC%d'%( question['index'], )],)
|
||||
answer_type = "single"
|
||||
if 'QM%d'%( question['index'], ) in answers:
|
||||
answer = answers.getlist('QM%d'%( question['index'], ))
|
||||
answer_type = "multiple"
|
||||
if 'QO%d'%( question['index'], ) in answers:
|
||||
answer = (answers['QO%d'%( question['index'], )],)
|
||||
answer_type = "open"
|
||||
if answer == None:
|
||||
continue
|
||||
for single in answer:
|
||||
cur.execute(
|
||||
"INSERT INTO %s VALUES (?, ?, ?)"%(
|
||||
key,
|
||||
),
|
||||
(
|
||||
question['name'],
|
||||
single.strip(),
|
||||
answer_type
|
||||
)
|
||||
)
|
||||
|
||||
cur.execute(
|
||||
"UPDATE %s SET answered = 'true' WHERE token = ?"%(
|
||||
get_voter_table_name(key),
|
||||
),
|
||||
(
|
||||
token,
|
||||
)
|
||||
)
|
||||
g.db.commit()
|
||||
|
||||
|
||||
def time_to_expiry(form):
|
||||
if form['expires'] == None:
|
||||
return "Never"
|
||||
|
||||
time_to_go = form['expires'] - datetime.now(timezone.utc)
|
||||
time_to_go = ".".join(str(time_to_go).split('.')[0:-1])[0:-3]
|
||||
|
||||
#time_to_go.microseconds = 0
|
||||
return "%s (%s to go)"%( form['expires'], time_to_go )
|
||||
Reference in New Issue
Block a user