version 1.0
This commit is contained in:
33
Bakefile
Normal file
33
Bakefile
Normal file
@@ -0,0 +1,33 @@
|
||||
|
||||
|
||||
install_useve() {
|
||||
set -e
|
||||
. useve-runner
|
||||
py-format .
|
||||
useve up qgpg
|
||||
|
||||
}
|
||||
|
||||
test_stuff() {
|
||||
cd ~/tmp/
|
||||
. useve-runner
|
||||
|
||||
useve qgpg
|
||||
rm -f key2* peak.gpg
|
||||
GPGPASS=secret @ qgpg --key key2 k
|
||||
@ find datadir -type f -name '*gpg' | xargs -II rm -v I
|
||||
@ dd if=/dev/random of=datadir/testfile bs=1024 count=102400
|
||||
@ qgpg --key key2.pub e datadir/lockscreens/peakpx.jpg
|
||||
@ qgpg --key key2.pub e datadir/lockscreens/peakpx.jpg peak.gpg
|
||||
@ qgpg --key key2.pub -r e datadir
|
||||
@ qgpg --key key2.pub -r e datadir
|
||||
}
|
||||
|
||||
test_decrypt() {
|
||||
cd ~/tmp/
|
||||
. useve-runner
|
||||
|
||||
useve qgpg
|
||||
@ find datadir -type f -name '*jpg' | xargs -II rm -v I
|
||||
@ qgpg --key key2 -r d datadir
|
||||
}
|
||||
264
qgpg/__init__.py
Normal file
264
qgpg/__init__.py
Normal file
@@ -0,0 +1,264 @@
|
||||
__version__ = "1.0"
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import shutil
|
||||
import sys
|
||||
import tempfile
|
||||
import time
|
||||
from getpass import getpass
|
||||
|
||||
import gnupg
|
||||
|
||||
chunk_size = 1024 * 1024 * 1 # = 1 Mb
|
||||
|
||||
|
||||
class MiniProgress:
|
||||
"""
|
||||
Displays a progress bar with every .next() iteration. Starts the timer when instance created
|
||||
Call .finish() at the end.
|
||||
"""
|
||||
|
||||
def __init__(self, maxcount=None, width=None):
|
||||
self.progress_chars = " ▏▎▍▌▋▊▉█"
|
||||
self.progress_n = 0
|
||||
self.progress_max = maxcount
|
||||
self.lastupdate = time.time()
|
||||
self.started = time.time()
|
||||
self.eta = None
|
||||
self.width = max(20, width) if width else max(20, shutil.get_terminal_size((80, 20)).columns - 2)
|
||||
self.bar_width = self.width - 15
|
||||
self.bar_wide = False
|
||||
self.update_frequency = 1.5 # shortened later
|
||||
if self.width > 80:
|
||||
self.bar_wide = True
|
||||
self.bar_width = self.width - 25
|
||||
|
||||
def next(self):
|
||||
elapsed = time.time() - self.started
|
||||
|
||||
if self.progress_n > 0 and self.progress_max and self.progress_n < self.progress_max:
|
||||
self.eta = elapsed / self.progress_n * (self.progress_max - self.progress_n)
|
||||
|
||||
self.progress_n += 1
|
||||
|
||||
if time.time() > self.lastupdate + self.update_frequency:
|
||||
self.update_frequency = 0.1
|
||||
if self.progress_max:
|
||||
chars = self.bar_width * self.progress_n / self.progress_max
|
||||
completed = f"{int(100*self.progress_n / self.progress_max): 3d}%"
|
||||
else:
|
||||
chars = self.progress_n / 9
|
||||
completed = " "
|
||||
if chars >= self.bar_width:
|
||||
chars = 0
|
||||
self.progress_n = 0
|
||||
|
||||
full_chars = int(chars)
|
||||
part_char = max(0, min(8, int(9 * (chars % 1))))
|
||||
full_pad = self.bar_width - full_chars
|
||||
|
||||
s = (
|
||||
(f"\r{completed}")
|
||||
+ (self._human_eta(elapsed) if self.bar_wide else "")
|
||||
+ full_chars * self.progress_chars[-1]
|
||||
+ self.progress_chars[part_char]
|
||||
+ " " * full_pad
|
||||
+ self._human_eta(self.eta)
|
||||
)
|
||||
print(s, file=sys.stderr, end="")
|
||||
sys.stderr.flush()
|
||||
self.lastupdate = time.time()
|
||||
|
||||
def finish(self):
|
||||
if time.time() < self.started + self.update_frequency:
|
||||
return
|
||||
completed = 100
|
||||
elapsed = time.time() - self.started
|
||||
full_pad = self.bar_width
|
||||
s = (
|
||||
f"\r{completed: 3d}%"
|
||||
+ ("[ ]" if self.bar_wide else "")
|
||||
+ full_pad * self.progress_chars[-1]
|
||||
+ self._human_eta(elapsed)
|
||||
)
|
||||
print(s, file=sys.stderr, end="\n")
|
||||
|
||||
def _human_eta(self, eta):
|
||||
|
||||
if eta is None:
|
||||
return ""
|
||||
t_m, t_s = divmod(int(eta), 60)
|
||||
t_h, t_m = divmod(t_m, 60)
|
||||
return f"[{t_h:02d}:{t_m:02d}:{t_s:02d}]"
|
||||
|
||||
|
||||
def get_opts():
|
||||
"""Returns command line arguments"""
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Recursive gpg encryption. ",
|
||||
epilog="Private key passphrase can be set to env variable GPGPASS, otherwise will prompt",
|
||||
)
|
||||
parser.add_argument("--width", help="Console width in characters. Defaults to auto detect.")
|
||||
parser.add_argument("--no-progress", help="Disable progress meter.", default=False, action="store_true")
|
||||
parser.add_argument(
|
||||
"--key", help="path to recipient public key for encryption or private key for decryption", required=True
|
||||
)
|
||||
parser.add_argument("--name", help="Name of the owner of the key", required=False, default="recipient@address")
|
||||
parser.add_argument(
|
||||
"-r",
|
||||
"--recursive",
|
||||
default=False,
|
||||
action="store_true",
|
||||
help="Recursive encryption or decryption. Processes full folder structures",
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"command",
|
||||
help="keygen/encrypt/decrypt. you can also use just the first letter.",
|
||||
choices=["k", "e", "d", "keygen", "encrypt", "decrypt"],
|
||||
)
|
||||
parser.add_argument("path", help="path to file to encrypt/decrypt, or folder if recursive", nargs="?")
|
||||
parser.add_argument("out_path", help="output file, if not using recursive.", nargs="?")
|
||||
parsed = parser.parse_args()
|
||||
if parsed.out_path and parsed.recursive:
|
||||
parser.error("Cannot assign output path, when in recursive mode")
|
||||
for cmd in ("keygen", "encrypt", "decrypt"):
|
||||
if parsed.command == cmd[0]:
|
||||
parsed.command = cmd
|
||||
return parsed
|
||||
|
||||
|
||||
class Collector:
|
||||
def __init__(self, filename, width, bufcount):
|
||||
|
||||
self.progress = MiniProgress(bufcount, width)
|
||||
self.out = open(filename, "wb")
|
||||
|
||||
def __call__(self, chunk):
|
||||
self.progress.next()
|
||||
self.out.write(chunk)
|
||||
if not chunk:
|
||||
self.out.close()
|
||||
self.progress.finish()
|
||||
return False
|
||||
|
||||
|
||||
class Processor:
|
||||
def __init__(self):
|
||||
self.opts = get_opts()
|
||||
self.homedir = tempfile.TemporaryDirectory()
|
||||
self.gpg = gnupg.GPG(gnupghome=self.homedir.name)
|
||||
self.gpg.encoding = "utf-8"
|
||||
self.suffix = ".gpg"
|
||||
|
||||
if self.opts.command == "keygen":
|
||||
self.keygen()
|
||||
filelist = []
|
||||
|
||||
if self.opts.command in ("encrypt", "decrypt"):
|
||||
import_result = self.gpg.import_keys_file(self.opts.key)
|
||||
filelist = self.get_filelist(self.opts.path, self.opts.recursive, self.opts.command)
|
||||
|
||||
if self.opts.command == "encrypt":
|
||||
|
||||
keyid = import_result.fingerprints[0]
|
||||
public_keys = self.gpg.list_keys()
|
||||
for key in public_keys:
|
||||
if key["fingerprint"] == keyid:
|
||||
self.uid = key["uids"][0]
|
||||
|
||||
if self.opts.command == "decrypt":
|
||||
self.set_phrase()
|
||||
n_f = len(filelist)
|
||||
for i, f in enumerate(filelist):
|
||||
print(f"{i}/{n_f} {f} {self.opts.command}ing", file=sys.stderr)
|
||||
self.process_single(f)
|
||||
|
||||
self.homedir.cleanup()
|
||||
|
||||
def set_phrase(self, twice=False):
|
||||
|
||||
self.phrase = os.environ.get("GPGPASS", None)
|
||||
if self.phrase is None:
|
||||
print("Type private key passphrase [Empty if no passphrase]")
|
||||
self.phrase = getpass()
|
||||
if twice:
|
||||
print("Type private key passphrase again to confirm")
|
||||
phrase2 = getpass()
|
||||
if phrase2 != self.phrase:
|
||||
print("Passphrases do not match!")
|
||||
sys.exit(1)
|
||||
|
||||
def get_filelist(self, root, recurse, direction):
|
||||
if not recurse:
|
||||
if os.path.isfile(root):
|
||||
return [root]
|
||||
raise ValueError("path is not a file, and no --recursive")
|
||||
filelist = []
|
||||
for path, dirs, files in os.walk(root):
|
||||
dirs.sort()
|
||||
files.sort()
|
||||
for f in files:
|
||||
if direction == "encrypt" and f.endswith(self.suffix):
|
||||
continue
|
||||
if direction == "decrypt" and not f.endswith(self.suffix):
|
||||
continue
|
||||
filelist.append(os.path.join(path, f))
|
||||
return filelist
|
||||
|
||||
def keygen(self):
|
||||
if os.path.exists(self.opts.key):
|
||||
print(f"File {self.opts.key} already exists")
|
||||
sys.exit(1)
|
||||
|
||||
self.set_phrase(twice=True)
|
||||
input_data = self.gpg.gen_key_input(
|
||||
key_type="RSA",
|
||||
key_length=2048,
|
||||
name_email=self.opts.name,
|
||||
passphrase=self.phrase,
|
||||
no_protection=self.phrase == "",
|
||||
)
|
||||
key = self.gpg.gen_key(input_data)
|
||||
|
||||
ascii_armored_public_keys = self.gpg.export_keys(key.fingerprint)
|
||||
ascii_armored_private_keys = self.gpg.export_keys(key.fingerprint, True, passphrase=self.phrase)
|
||||
|
||||
with open(self.opts.key + ".pub", "wt") as fp:
|
||||
fp.write(ascii_armored_public_keys)
|
||||
with open(self.opts.key, "wt") as fp:
|
||||
fp.write(ascii_armored_private_keys)
|
||||
os.chmod(self.opts.key, 0o600)
|
||||
print(f"Generated {self.opts.key} and {self.opts.key}.pub", file=sys.stderr)
|
||||
|
||||
def process_single(self, in_file):
|
||||
|
||||
if self.opts.command == "encrypt":
|
||||
auto_path = in_file + self.suffix
|
||||
if self.opts.command == "decrypt":
|
||||
auto_path = in_file[: -len(self.suffix)]
|
||||
out_file = self.opts.out_path if self.opts.out_path else auto_path
|
||||
if os.path.exists(out_file):
|
||||
print(f"{out_file} already exists", file=sys.stderr)
|
||||
return
|
||||
|
||||
file_size = os.path.getsize(in_file)
|
||||
self.gpg.on_data = Collector(out_file, self.opts.width, int(file_size / self.gpg.buffer_size))
|
||||
try:
|
||||
with open(in_file, "rb") as fp:
|
||||
if self.opts.command == "encrypt":
|
||||
d = self.gpg.encrypt_file(fp, self.uid, always_trust=True, armor=False)
|
||||
if self.opts.command == "decrypt":
|
||||
d = self.gpg.decrypt_file(fp, passphrase=self.phrase, always_trust=True)
|
||||
if not d.ok:
|
||||
print(d.status, file=sys.stderr)
|
||||
except KeyboardInterrupt:
|
||||
if os.path.exists(out_file):
|
||||
os.unlink(out_file)
|
||||
print("Process cancelled", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def main():
|
||||
Processor()
|
||||
Reference in New Issue
Block a user