396 lines
11 KiB
Python
396 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
|
|
from collections import OrderedDict
|
|
from datetime import datetime
|
|
from utils import *
|
|
import argparse
|
|
import os
|
|
import random
|
|
import sqlite3
|
|
import string
|
|
import sys
|
|
|
|
|
|
def insert_token(db, name, token, role):
|
|
cur = db.cursor()
|
|
|
|
cur.execute("""
|
|
INSERT INTO tokens (token, question_set, answered, role) VALUES (
|
|
?,
|
|
?,
|
|
'false',
|
|
?
|
|
);
|
|
""",
|
|
(
|
|
get_hash(token),
|
|
name,
|
|
role
|
|
)
|
|
)
|
|
db.commit()
|
|
|
|
|
|
def manage_tokens(options):
|
|
if not is_key(options.name, options):
|
|
raise Exception("%s does not exist, or is not a valid question set name"%( options.name, ))
|
|
db = open_db(options.db)
|
|
if options.list:
|
|
cur = db.cursor()
|
|
cur.execute(
|
|
"SELECT token, answered FROM tokens WHERE question_set = ? AND role = 'voter'",
|
|
(
|
|
options.name,
|
|
)
|
|
)
|
|
for row in cur:
|
|
print("%s:%s (%s)"%(
|
|
options.name,
|
|
row[0],
|
|
"used" if row[1] == "true" else "unused"
|
|
))
|
|
return
|
|
if options.role == 'voter':
|
|
service = 'vote'
|
|
if options.role == 'observer':
|
|
service = 'observe'
|
|
for i in range(options.number):
|
|
N = 32
|
|
token = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(N))
|
|
insert_token(db, options.name, token, options.role)
|
|
print("%s/%s/%s/%s"%(
|
|
options.prefix,
|
|
service,
|
|
options.name,
|
|
token
|
|
))
|
|
|
|
|
|
def open_db(db):
|
|
return sqlite3.connect(db)
|
|
|
|
|
|
def parse_options(database, questions):
|
|
path_self = os.path.realpath(
|
|
os.path.dirname(
|
|
os.path.realpath(__file__)
|
|
)
|
|
)
|
|
default_db = os.path.join(path_self, database)
|
|
default_questions = os.path.join(path_self, questions)
|
|
|
|
parser = argparse.ArgumentParser(description='aBot vote manager')
|
|
parser.add_argument('--db', action="store", dest="db", default = default_db,
|
|
help = "Path to database [%(default)s]")
|
|
parser.add_argument('--questions', action="store", dest="questions", default = default_questions,
|
|
help = "Path to question folder [%(default)s]")
|
|
parser.add_argument(
|
|
dest = "name",
|
|
help = "Name of the question set",
|
|
nargs = '?',
|
|
default = None
|
|
)
|
|
|
|
subparsers = parser.add_subparsers(help='sub-command', dest='subparser_name')
|
|
## tokens
|
|
parser_token = subparsers.add_parser('token', help = "Manage tokens")
|
|
parser_token.add_argument(
|
|
'-n',
|
|
action="store",
|
|
dest="number",
|
|
default = 1, type = int,
|
|
help = "Number of tokens to generate"
|
|
)
|
|
parser_token.add_argument(
|
|
'--prefix',
|
|
action = "store",
|
|
dest = "prefix",
|
|
default = "",
|
|
help = "Prefix tokens with the server URL to automate emails etc.."
|
|
)
|
|
parser_token.add_argument(
|
|
'--role',
|
|
action = "store",
|
|
dest = "role",
|
|
default = "voter",
|
|
choices = ['voter', 'observer'],
|
|
help = "Add token for role. observer is a token that can only view the current status of the vote event."
|
|
)
|
|
parser_token.add_argument(
|
|
'--list', '-l',
|
|
action="store_true",
|
|
dest="list",
|
|
default = False,
|
|
help = "List existing tokens, instead of generating more"
|
|
)
|
|
|
|
## show question set
|
|
parser_show = subparsers.add_parser('show', help = "Show a question set")
|
|
|
|
## list votes
|
|
parser_list = subparsers.add_parser('list', help = "List all votes")
|
|
parser_list.add_argument(
|
|
'--tsv',
|
|
action="store_true",
|
|
dest="tsv",
|
|
default = False,
|
|
help = "TSV output"
|
|
)
|
|
|
|
## summary of vote
|
|
parser_summary = subparsers.add_parser('summary', help = "Vote results")
|
|
parser_summary.add_argument(
|
|
'--tsv',
|
|
action="store_true",
|
|
dest="tsv",
|
|
default = False,
|
|
help = "TSV output"
|
|
)
|
|
|
|
## clear
|
|
parser_clear_votes = subparsers.add_parser('clear-votes', help = "Delete results")
|
|
parser_clear_votes.add_argument(
|
|
'--really',
|
|
action="store_true",
|
|
dest="really",
|
|
default = False,
|
|
help = "Really delete results for the vote"
|
|
)
|
|
|
|
## clear tokens
|
|
parser_clear_tokens = subparsers.add_parser('clear-tokens', help = "Delete tokens")
|
|
parser_clear_tokens.add_argument(
|
|
'--really',
|
|
action="store_true",
|
|
dest="really",
|
|
default = False,
|
|
help = "Really delete tokens for the vote"
|
|
)
|
|
|
|
parsed = parser.parse_args()
|
|
if parsed.name == None:
|
|
print("Names of question sets:")
|
|
list_question_sets(parsed)
|
|
sys.exit(0)
|
|
return parsed
|
|
|
|
|
|
def list_question_sets(options):
|
|
for f in os.listdir(options.questions):
|
|
if not f.endswith(".txt"):
|
|
continue
|
|
print(f[0:-4])
|
|
|
|
|
|
def list_votes(options):
|
|
if not is_key(options.name, options):
|
|
raise Exception("%s does not exist, or is not a valid question set name"%( options.name, ))
|
|
db = open_db(options.db)
|
|
votes = OrderedDict()
|
|
cur = db.cursor()
|
|
cur.execute(
|
|
"SELECT question, answer, answer_type FROM answers WHERE question_set = ?",
|
|
(
|
|
options.name,
|
|
)
|
|
)
|
|
for row in cur:
|
|
if row[0] not in votes.keys():
|
|
votes[row[0]] = {
|
|
'answers': [],
|
|
'answer_type': row[2]
|
|
}
|
|
votes[row[0]]['answers'].append(row[1])
|
|
|
|
if hasattr(options, 'tsv') and options.tsv:
|
|
out = votes_tsv(votes)
|
|
else:
|
|
out = votes_list(votes)
|
|
print(out)
|
|
|
|
|
|
def show(options):
|
|
if not is_key(options.name, options):
|
|
raise Exception("%s does not exist, or is not a valid question set name"%( options.name, ))
|
|
form = parse_form(options.name, options.questions, True)
|
|
|
|
form['ends'] = time_to('expires', form)
|
|
form['starts'] = time_to('opens', form)
|
|
print("""
|
|
Voting ends at: {ends}
|
|
Voting starts at: {starts}
|
|
Style (open/closed): {vote_style}
|
|
Show results after voting: {show_results}
|
|
Title: {title}
|
|
Draft: {draft}
|
|
""".format_map(form))
|
|
|
|
for question in form['questions']:
|
|
print("\n# {name}".format_map(question))
|
|
for choice in question['choices']:
|
|
print("- " + choice)
|
|
for choice in question['multichoices']:
|
|
print("+ " + choice)
|
|
if question['open_question']:
|
|
print(",----------.")
|
|
print("| |")
|
|
print("`----------'")
|
|
|
|
|
|
def summary(options):
|
|
if not is_key(options.name, options):
|
|
raise Exception("%s does not exist, or is not a valid question set name"%( options.name, ))
|
|
db = open_db(options.db)
|
|
questions, answers = get_summary(db, options.name)
|
|
tokens = get_token_counts(db, options.name)
|
|
|
|
if hasattr(options, 'tsv') and options.tsv:
|
|
out = summary_tsv(questions, answers, tokens)
|
|
else:
|
|
out = summary_list(questions, answers, tokens)
|
|
print(out)
|
|
|
|
|
|
def summary_list(questions, answers, tokens):
|
|
s = """# Tokens for this question set:
|
|
# used: {used}, unused: {unused}, total: {total}
|
|
""".format_map(tokens)
|
|
|
|
for q,a in zip(questions, answers):
|
|
sum_answers = sum([answers[q]['answers'][x] for x in answers[q]['answers']])
|
|
s += "\n%s\n# Answers total: %d\n"%( q, sum_answers, )
|
|
sorted_answers = sorted(
|
|
[(x, answers[q]['answers'][x]) for x in answers[q]['answers']],
|
|
key = lambda i: -i[1]
|
|
)
|
|
for answer in sorted_answers:
|
|
prefix = ""
|
|
postfix = ""
|
|
if answers[q]['answer_type'] == "single":
|
|
prefix = "- "
|
|
postfix = ": %d (%d%%) "%( answer[1], 100 * float(answer[1])/sum_answers)
|
|
if answers[q]['answer_type'] == "multiple":
|
|
prefix = "+ "
|
|
postfix = ": %d (%d%%) "%( answer[1], 100 * float(answer[1])/sum_answers)
|
|
if answers[q]['answer_type'] == "open":
|
|
prefix = "----\n> "
|
|
postfix = ""
|
|
|
|
s += "%s%s%s\n"%(
|
|
prefix,
|
|
answer[0],
|
|
postfix
|
|
)
|
|
return s
|
|
|
|
|
|
def summary_tsv(questions, answers, tokens):
|
|
s = '''Tokens\tUsed\tUnused\tTotal
|
|
""\t"{used}"\t"{unused}"\t"{total}"
|
|
'''.format_map(tokens)
|
|
s += '"Question"\t"Question type"\t"Answer"\t"Count"\n'
|
|
good_characters = dict.fromkeys(range(32))
|
|
for q,a in zip(questions, answers):
|
|
sum_answers = sum([answers[q]['answers'][x] for x in answers[q]['answers']])
|
|
s += '"%s"\t"%s"\t""\t"%d"\n'%( q, answers[q]['answer_type'], sum_answers, )
|
|
sorted_answers = sorted(
|
|
[(x, answers[q]['answers'][x]) for x in answers[q]['answers']],
|
|
key = lambda i: -i[1]
|
|
)
|
|
for answer in sorted_answers:
|
|
|
|
s += '""\t""\t""%s\t"%d"\n'%(
|
|
answer[0].translate(good_characters),
|
|
answer[1],
|
|
)
|
|
return s
|
|
|
|
|
|
def votes_list(votes):
|
|
""" votes as list format """
|
|
s = ""
|
|
|
|
for q in votes:
|
|
s += "\n# %s (%s)\n"%( q, votes[q]['answer_type'])
|
|
|
|
for answer in votes[q]['answers']:
|
|
prefix = ""
|
|
if votes[q]['answer_type'] == "single":
|
|
prefix = "- "
|
|
if votes[q]['answer_type'] == "multiple":
|
|
prefix = "+ "
|
|
if votes[q]['answer_type'] == "open":
|
|
prefix = "----\n> "
|
|
|
|
s += "%s%s\n"%(
|
|
prefix,
|
|
answer,
|
|
)
|
|
return s
|
|
|
|
|
|
def votes_tsv(votes):
|
|
s = '"Question"\t"Question type"\t"Answer"\n'
|
|
good_characters = dict.fromkeys(range(32))
|
|
for q in votes:
|
|
for answer in votes[q]['answers']:
|
|
s += '"%s"\t"%s"\t"%s"\n'%(
|
|
q,
|
|
votes[q]['answer_type'],
|
|
answer.translate(good_characters),
|
|
)
|
|
return s
|
|
|
|
|
|
|
|
def clear_votes(options):
|
|
try:
|
|
summary(options)
|
|
except Exception as err:
|
|
print("\nQuestions no longer available")
|
|
if not options.really:
|
|
print("\nNot really deleting results. use --really")
|
|
sys.exit(0)
|
|
db = open_db(options.db)
|
|
cur = db.cursor()
|
|
|
|
cur.execute(
|
|
"DELETE FROM answers WHERE question_set = ?",
|
|
( options.name, )
|
|
)
|
|
db.commit()
|
|
print("\nDeleted votes for %s"%( options.name, ))
|
|
|
|
|
|
def clear_tokens(options):
|
|
db = open_db(options.db)
|
|
print(get_token_counts(db, options.name))
|
|
if not options.really:
|
|
print("\nNot really deleting tokens. use --really")
|
|
sys.exit(0)
|
|
db = open_db(options.db)
|
|
cur = db.cursor()
|
|
cur.execute(
|
|
"DELETE FROM tokens WHERE question_set = ?",
|
|
( options.name, )
|
|
)
|
|
db.commit()
|
|
print("\nDeleted tokens for %s"%( options.name, ))
|
|
|
|
|
|
def main(database, questions):
|
|
options = parse_options(database, questions)
|
|
if options.subparser_name == "token":
|
|
manage_tokens(options)
|
|
if options.subparser_name == "show":
|
|
show(options)
|
|
if options.subparser_name == "list":
|
|
list_votes(options)
|
|
if options.subparser_name == "summary":
|
|
summary(options)
|
|
if options.subparser_name == "clear-votes":
|
|
clear_votes(options)
|
|
if options.subparser_name == "clear-tokens":
|
|
clear_tokens(options)
|
|
|