from datetime import datetime, timezone from flask import current_app as app from flask import g import os from werkzeug.utils import secure_filename import html import sqlite3 def connect_db(): return sqlite3.connect(app.config['DATABASE']) def create_db(db_file): db = sqlite3.connect(db_file) cur = db.cursor() cur.execute(""" CREATE TABLE IF NOT EXISTS answers ( question_set TEXT, question TEXT, answer TEXT, answer_type TEXT ); """) cur.execute(""" CREATE TABLE IF NOT EXISTS tokens ( token TEXT PRIMARY KEY, question_set TEXT, answered BOOLEAN ); """ ) db.commit() def get_token_counts(db, key): cur = db.cursor() cur.execute( "SELECT count(*) FROM tokens WHERE answered = 'true' and question_set = ?", ( key, ) ) used_tokens = cur.fetchall()[0][0] cur.execute( "SELECT count(*) FROM tokens WHERE answered = 'false' and question_set = ?", ( key, ) ) unused_tokens = cur.fetchall()[0][0] tokens = { 'unused': unused_tokens, 'used': used_tokens, 'total': used_tokens + unused_tokens } return tokens def get_summary(db, key): """ returns summary for a vote event """ questions = [] answers = {} cur = db.cursor() cur.execute( "SELECT question, answer, answer_type FROM answers WHERE question_set = ?", ( key, ) ) for row in cur: if row[0] not in answers.keys(): questions.append(row[0]) answers[row[0]] = { 'answers': {}, 'answer_type': row[2] } if row[1] not in answers[row[0]]['answers'].keys(): answers[row[0]]['answers'][row[1]] = 0 answers[row[0]]['answers'][row[1]] += 1 return questions, answers def has_voted(key, token): if token == None: return True cur = g.db.cursor() cur.execute( "SELECT token FROM tokens WHERE token = ? AND answered = 'true' AND question_set = ?", ( token, key ) ) return len(cur.fetchall()) > 0 def is_closed_vote(form): return form['vote_style'] == 'closed' def is_draft(form): return form['draft'] def is_expired(form): if form['expires'] == None: return False return datetime.now(timezone.utc) > form['expires'] def is_key(key, cli_opts = False): key = secure_filename(key) if cli_opts: root_path = cli_opts.questions else: root_path = app.config['QUESTIONS'] return os.path.exists( os.path.join( root_path, key + ".txt" ) ) def is_show_results(form): return form['show_results'] def parse_form(key): form = { 'expires': None, 'draft': False, 'vote_style': "closed", 'show_results': False, 'questions': [] } key = secure_filename(key) try: current_question = 0 with open(os.path.join(app.config['QUESTIONS'], key + ".txt"), "rt") as fp: for row in fp: if row.strip() == "": continue rowsl = row.lower().rstrip() if row.startswith("#"): continue if rowsl.startswith("expires: "): form['expires'] = parse_row_date(row) continue if rowsl.startswith("draft: "): if rowsl == "draft: true": form['draft'] = True continue if rowsl.startswith("vote_style: "): if rowsl == "vote_style: open": form['vote_style'] = "open" continue if rowsl.startswith("show_results: "): if rowsl == "show_results: true": form['show_results'] = True continue if row.startswith("- "): if current_question == None: continue form['questions'][current_question]['choices'].append( row[2:].strip() ) continue if row.startswith("+ "): if current_question == None: continue form['questions'][current_question]['multichoices'].append( row[2:].strip() ) continue current_question = len(form['questions']) form['questions'].append({ 'choices': [], 'multichoices': [], 'index': current_question + 1, 'name': row.strip().rstrip("_:").rstrip(), 'open_question': row.strip().endswith("___"), 'autoformat': not rowsl.startswith("<") }) return form except Exception as err: if app.config['DEBUG']: raise err return False def parse_row_date(row): row = row[9:].strip() if row.lower() == "none": return None try: return datetime.strptime( row, '%Y-%m-%d %H:%M %z' ) except Exception as err: if app.config['DEBUG']: print(row) raise err return None def write_vote(key, token, answers, form): cur = g.db.cursor() for question in form['questions']: answer = None answer_type = None if 'QC%d'%( question['index'], ) in answers: answer = (answers['QC%d'%( question['index'], )],) answer_type = "single" if 'QM%d'%( question['index'], ) in answers: answer = answers.getlist('QM%d'%( question['index'], )) answer_type = "multiple" if 'QO%d'%( question['index'], ) in answers: answer = (answers['QO%d'%( question['index'], )],) answer_type = "open" if answer == None: continue if answer_type == None: continue for single in answer: cur.execute( "INSERT INTO answers (question_set, question, answer, answer_type) VALUES (?, ?, ?, ?)", ( key, question['name'], html.escape(single).strip(), answer_type ) ) if is_closed_vote(form): cur.execute( "UPDATE tokens SET answered = 'true' WHERE token = ? AND question_set = ?", ( token, key ) ) g.db.commit() def sort_summary(questions, answers): sorted_answer_list = [] for q,a in zip(questions, answers): sum_answers = sum([answers[q]['answers'][x] for x in answers[q]['answers']]) sorted_answers = sorted( [{"answer_type": answers[q]['answer_type'] , "answer": x, "count": answers[q]['answers'][x], "percent": int(100 * float(answers[q]['answers'][x]) / sum_answers)} for x in answers[q]['answers']], key = lambda i: -i['count'] ) sorted_answer_list.append(sorted_answers) return questions, sorted_answer_list def time_to_expiry(form): if form['expires'] == None: return "Never" time_to_go = form['expires'] - datetime.now(timezone.utc) time_to_go = ".".join(str(time_to_go).split('.')[0:-1])[0:-3] #time_to_go.microseconds = 0 return "%s (%s to go)"%( form['expires'], time_to_go )