Files
abot/manager.py
2018-12-13 22:34:02 +02:00

287 lines
8.2 KiB
Python

#!/usr/bin/env python3
from datetime import datetime
from utils import *
import argparse
import os
import random
import sqlite3
import string
import sys
def insert_token(db, name, token, role):
cur = db.cursor()
cur.execute("""
INSERT INTO tokens (token, question_set, answered, role) VALUES (
?,
?,
'false',
?
);
""",
(
get_hash(token),
name,
role
)
)
db.commit()
def manage_tokens(options):
if not is_key(options.name, options):
raise Exception("%s does not exist, or is not a valid question set name"%( options.name, ))
db = open_db(options.db)
if options.list:
cur = db.cursor()
cur.execute(
"SELECT token, answered FROM tokens WHERE question_set = ? AND role = 'voter'",
(
options.name,
)
)
for row in cur:
print("%s:%s (%s)"%(
options.name,
row[0],
"used" if row[1] == "true" else "unused"
))
return
if options.role == 'voter':
service = 'vote'
if options.role == 'observer':
service = 'observe'
for i in range(options.number):
N = 32
token = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(N))
insert_token(db, options.name, token, options.role)
print("%s/%s/%s/%s"%(
options.prefix,
service,
options.name,
token
))
def open_db(db):
return sqlite3.connect(db)
def parse_options(database, questions):
path_self = os.path.realpath(
os.path.dirname(
os.path.realpath(__file__)
)
)
default_db = os.path.join(path_self, database)
default_questions = os.path.join(path_self, questions)
parser = argparse.ArgumentParser(description='aBot vote manager')
parser.add_argument('--db', action="store", dest="db", default = default_db,
help = "Path to database [%(default)s]")
parser.add_argument('--questions', action="store", dest="questions", default = default_questions,
help = "Path to question folder [%(default)s]")
parser.add_argument(
dest = "name",
help = "Name of the question set",
nargs = '?',
default = None
)
subparsers = parser.add_subparsers(help='sub-command', dest='subparser_name')
## tokens
parser_token = subparsers.add_parser('token', help = "Manage tokens")
parser_token.add_argument(
'-n',
action="store",
dest="number",
default = 1, type = int,
help = "Number of tokens to generate"
)
parser_token.add_argument(
'--prefix',
action = "store",
dest = "prefix",
default = "",
help = "Prefix tokens with the server URL to automate emails etc.."
)
parser_token.add_argument(
'--role',
action = "store",
dest = "role",
default = "voter",
choices = ['voter', 'observer'],
help = "Add token for role. observer is a token that can only view the current status of the vote event."
)
parser_token.add_argument(
'--list', '-l',
action="store_true",
dest="list",
default = False,
help = "List existing tokens, instead of generating more"
)
## summary of vote
parser_summary = subparsers.add_parser('summary', help = "Vote results")
parser_summary.add_argument(
'--tsv',
action="store_true",
dest="tsv",
default = False,
help = "TSV output"
)
## clear
parser_clear_votes = subparsers.add_parser('clear-votes', help = "Delete results")
parser_clear_votes.add_argument(
'--really',
action="store_true",
dest="really",
default = False,
help = "Really delete results for the vote"
)
## clear tokens
parser_clear_tokens = subparsers.add_parser('clear-tokens', help = "Delete tokens")
parser_clear_tokens.add_argument(
'--really',
action="store_true",
dest="really",
default = False,
help = "Really delete tokens for the vote"
)
parsed = parser.parse_args()
if parsed.name == None:
print("Names of question sets:")
list_question_sets(parsed)
sys.exit(0)
return parsed
def list_question_sets(options):
for f in os.listdir(options.questions):
if not f.endswith(".txt"):
continue
print(f[0:-4])
def summary(options):
if not is_key(options.name, options):
raise Exception("%s does not exist, or is not a valid question set name"%( options.name, ))
db = open_db(options.db)
questions, answers = get_summary(db, options.name)
tokens = get_token_counts(db, options.name)
if hasattr(options, 'tsv') and options.tsv:
out = summary_tsv(questions, answers, tokens)
else:
out = summary_list(questions, answers, tokens)
print(out)
def summary_list(questions, answers, tokens):
s = """# Tokens for this question set:
# used: {used}, unused: {unused}, total: {total}
""".format_map(tokens)
for q,a in zip(questions, answers):
sum_answers = sum([answers[q]['answers'][x] for x in answers[q]['answers']])
s += "\n%s\n# Answers total: %d\n"%( q, sum_answers, )
sorted_answers = sorted(
[(x, answers[q]['answers'][x]) for x in answers[q]['answers']],
key = lambda i: -i[1]
)
for answer in sorted_answers:
prefix = ""
postfix = ""
if answers[q]['answer_type'] == "single":
prefix = "- "
postfix = ": %d (%d%%) "%( answer[1], 100 * float(answer[1])/sum_answers)
if answers[q]['answer_type'] == "multiple":
prefix = "+ "
postfix = ": %d (%d%%) "%( answer[1], 100 * float(answer[1])/sum_answers)
if answers[q]['answer_type'] == "open":
prefix = "----\n> "
postfix = ""
s += "%s%s%s\n"%(
prefix,
answer[0],
postfix
)
return s
def summary_tsv(questions, answers, tokens):
s = '''Tokens\tUsed\tUnused\tTotal
""\t"{used}"\t"{unused}"\t"{total}"
'''.format_map(tokens)
s += '"Question"\t"Question type"\t"Answer"\t"Count"\n'
good_characters = dict.fromkeys(range(32))
for q,a in zip(questions, answers):
sum_answers = sum([answers[q]['answers'][x] for x in answers[q]['answers']])
s += '"%s"\t"%s"\t""\t"%d"\n'%( q, answers[q]['answer_type'], sum_answers, )
sorted_answers = sorted(
[(x, answers[q]['answers'][x]) for x in answers[q]['answers']],
key = lambda i: -i[1]
)
for answer in sorted_answers:
s += '""\t""\t""%s\t"%d"\n'%(
answer[0].translate(good_characters),
answer[1],
)
return s
def clear_votes(options):
try:
summary(options)
except Exception as err:
print("\nQuestions no longer available")
if not options.really:
print("\nNot really deleting results. use --really")
sys.exit(0)
db = open_db(options.db)
cur = db.cursor()
cur.execute(
"DELETE FROM answers WHERE question_set = ?",
( options.name, )
)
db.commit()
print("\nDeleted votes for %s"%( options.name, ))
def clear_tokens(options):
db = open_db(options.db)
print(get_token_counts(db, options.name))
if not options.really:
print("\nNot really deleting tokens. use --really")
sys.exit(0)
db = open_db(options.db)
cur = db.cursor()
cur.execute(
"DELETE FROM tokens WHERE question_set = ?",
( options.name, )
)
db.commit()
print("\nDeleted tokens for %s"%( options.name, ))
def main(database, questions):
options = parse_options(database, questions)
if options.subparser_name == "token":
manage_tokens(options)
if options.subparser_name == "summary":
summary(options)
if options.subparser_name == "clear-votes":
clear_votes(options)
if options.subparser_name == "clear-tokens":
clear_tokens(options)