#!/usr/bin/env python3 from datetime import datetime from utils import * import argparse import os import random import sqlite3 import string import sys def insert_token(db, name, token, role): cur = db.cursor() cur.execute(""" INSERT INTO tokens (token, question_set, answered, role) VALUES ( ?, ?, 'false', ? ); """, ( get_hash(token), name, role ) ) db.commit() def manage_tokens(options): if not is_key(options.name, options): raise Exception("%s does not exist, or is not a valid question set name"%( options.name, )) db = open_db(options.db) if options.list: cur = db.cursor() cur.execute( "SELECT token, answered FROM tokens WHERE question_set = ? AND role = 'voter'", ( options.name, ) ) for row in cur: print("%s:%s (%s)"%( options.name, row[0], "used" if row[1] == "true" else "unused" )) return if options.role == 'voter': service = 'vote' if options.role == 'observer': service = 'observe' for i in range(options.number): N = 32 token = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(N)) insert_token(db, options.name, token, options.role) print("%s/%s/%s/%s"%( options.prefix, service, options.name, token )) def open_db(db): return sqlite3.connect(db) def parse_options(database, questions): path_self = os.path.realpath( os.path.dirname( os.path.realpath(__file__) ) ) default_db = os.path.join(path_self, database) default_questions = os.path.join(path_self, questions) parser = argparse.ArgumentParser(description='aBot vote manager') parser.add_argument('--db', action="store", dest="db", default = default_db, help = "Path to database [%(default)s]") parser.add_argument('--questions', action="store", dest="questions", default = default_questions, help = "Path to question folder [%(default)s]") parser.add_argument( dest = "name", help = "Name of the question set", nargs = '?', default = None ) subparsers = parser.add_subparsers(help='sub-command', dest='subparser_name') ## tokens parser_token = subparsers.add_parser('token', help = "Manage tokens") parser_token.add_argument( '-n', action="store", dest="number", default = 1, type = int, help = "Number of tokens to generate" ) parser_token.add_argument( '--prefix', action = "store", dest = "prefix", default = "", help = "Prefix tokens with the server URL to automate emails etc.." ) parser_token.add_argument( '--role', action = "store", dest = "role", default = "voter", choices = ['voter', 'observer'], help = "Add token for role. observer is a token that can only view the current status of the vote event." ) parser_token.add_argument( '--list', '-l', action="store_true", dest="list", default = False, help = "List existing tokens, instead of generating more" ) ## summary of vote parser_summary = subparsers.add_parser('summary', help = "Vote results") parser_summary.add_argument( '--tsv', action="store_true", dest="tsv", default = False, help = "TSV output" ) ## clear parser_clear_votes = subparsers.add_parser('clear-votes', help = "Delete results") parser_clear_votes.add_argument( '--really', action="store_true", dest="really", default = False, help = "Really delete results for the vote" ) ## clear tokens parser_clear_tokens = subparsers.add_parser('clear-tokens', help = "Delete tokens") parser_clear_tokens.add_argument( '--really', action="store_true", dest="really", default = False, help = "Really delete tokens for the vote" ) parsed = parser.parse_args() if parsed.name == None: print("Names of question sets:") list_question_sets(parsed) sys.exit(0) return parsed def list_question_sets(options): for f in os.listdir(options.questions): if not f.endswith(".txt"): continue print(f[0:-4]) def summary(options): if not is_key(options.name, options): raise Exception("%s does not exist, or is not a valid question set name"%( options.name, )) db = open_db(options.db) questions, answers = get_summary(db, options.name) tokens = get_token_counts(db, options.name) if hasattr(options, 'tsv') and options.tsv: out = summary_tsv(questions, answers, tokens) else: out = summary_list(questions, answers, tokens) print(out) def summary_list(questions, answers, tokens): s = """# Tokens for this question set: # used: {used}, unused: {unused}, total: {total} """.format_map(tokens) for q,a in zip(questions, answers): sum_answers = sum([answers[q]['answers'][x] for x in answers[q]['answers']]) s += "\n%s\n# Answers total: %d\n"%( q, sum_answers, ) sorted_answers = sorted( [(x, answers[q]['answers'][x]) for x in answers[q]['answers']], key = lambda i: -i[1] ) for answer in sorted_answers: prefix = "" postfix = "" if answers[q]['answer_type'] == "single": prefix = "- " postfix = ": %d (%d%%) "%( answer[1], 100 * float(answer[1])/sum_answers) if answers[q]['answer_type'] == "multiple": prefix = "+ " postfix = ": %d (%d%%) "%( answer[1], 100 * float(answer[1])/sum_answers) if answers[q]['answer_type'] == "open": prefix = "----\n> " postfix = "" s += "%s%s%s\n"%( prefix, answer[0], postfix ) return s def summary_tsv(questions, answers, tokens): s = '''Tokens\tUsed\tUnused\tTotal ""\t"{used}"\t"{unused}"\t"{total}" '''.format_map(tokens) s += '"Question"\t"Question type"\t"Answer"\t"Count"\n' good_characters = dict.fromkeys(range(32)) for q,a in zip(questions, answers): sum_answers = sum([answers[q]['answers'][x] for x in answers[q]['answers']]) s += '"%s"\t"%s"\t""\t"%d"\n'%( q, answers[q]['answer_type'], sum_answers, ) sorted_answers = sorted( [(x, answers[q]['answers'][x]) for x in answers[q]['answers']], key = lambda i: -i[1] ) for answer in sorted_answers: s += '""\t""\t""%s\t"%d"\n'%( answer[0].translate(good_characters), answer[1], ) return s def clear_votes(options): try: summary(options) except Exception as err: print("\nQuestions no longer available") if not options.really: print("\nNot really deleting results. use --really") sys.exit(0) db = open_db(options.db) cur = db.cursor() cur.execute( "DELETE FROM answers WHERE question_set = ?", ( options.name, ) ) db.commit() print("\nDeleted votes for %s"%( options.name, )) def clear_tokens(options): db = open_db(options.db) print(get_token_counts(db, options.name)) if not options.really: print("\nNot really deleting tokens. use --really") sys.exit(0) db = open_db(options.db) cur = db.cursor() cur.execute( "DELETE FROM tokens WHERE question_set = ?", ( options.name, ) ) db.commit() print("\nDeleted tokens for %s"%( options.name, )) def main(database, questions): options = parse_options(database, questions) if options.subparser_name == "token": manage_tokens(options) if options.subparser_name == "summary": summary(options) if options.subparser_name == "clear-votes": clear_votes(options) if options.subparser_name == "clear-tokens": clear_tokens(options)