From 9158b06007298875267d4f9978ee49eeaf74b9ac Mon Sep 17 00:00:00 2001 From: Q Date: Wed, 26 Jun 2024 07:26:40 +0300 Subject: [PATCH] version 1.0 --- Bakefile | 33 ++++++ qgpg/__init__.py | 264 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 297 insertions(+) create mode 100644 Bakefile create mode 100644 qgpg/__init__.py diff --git a/Bakefile b/Bakefile new file mode 100644 index 0000000..37c37ad --- /dev/null +++ b/Bakefile @@ -0,0 +1,33 @@ + + +install_useve() { + set -e + . useve-runner + py-format . + useve up qgpg + +} + +test_stuff() { + cd ~/tmp/ + . useve-runner + + useve qgpg + rm -f key2* peak.gpg + GPGPASS=secret @ qgpg --key key2 k + @ find datadir -type f -name '*gpg' | xargs -II rm -v I + @ dd if=/dev/random of=datadir/testfile bs=1024 count=102400 + @ qgpg --key key2.pub e datadir/lockscreens/peakpx.jpg + @ qgpg --key key2.pub e datadir/lockscreens/peakpx.jpg peak.gpg + @ qgpg --key key2.pub -r e datadir + @ qgpg --key key2.pub -r e datadir +} + +test_decrypt() { + cd ~/tmp/ + . useve-runner + + useve qgpg + @ find datadir -type f -name '*jpg' | xargs -II rm -v I + @ qgpg --key key2 -r d datadir +} diff --git a/qgpg/__init__.py b/qgpg/__init__.py new file mode 100644 index 0000000..abd70da --- /dev/null +++ b/qgpg/__init__.py @@ -0,0 +1,264 @@ +__version__ = "1.0" + +import argparse +import os +import shutil +import sys +import tempfile +import time +from getpass import getpass + +import gnupg + +chunk_size = 1024 * 1024 * 1 # = 1 Mb + + +class MiniProgress: + """ + Displays a progress bar with every .next() iteration. Starts the timer when instance created + Call .finish() at the end. + """ + + def __init__(self, maxcount=None, width=None): + self.progress_chars = " ▏▎▍▌▋▊▉█" + self.progress_n = 0 + self.progress_max = maxcount + self.lastupdate = time.time() + self.started = time.time() + self.eta = None + self.width = max(20, width) if width else max(20, shutil.get_terminal_size((80, 20)).columns - 2) + self.bar_width = self.width - 15 + self.bar_wide = False + self.update_frequency = 1.5 # shortened later + if self.width > 80: + self.bar_wide = True + self.bar_width = self.width - 25 + + def next(self): + elapsed = time.time() - self.started + + if self.progress_n > 0 and self.progress_max and self.progress_n < self.progress_max: + self.eta = elapsed / self.progress_n * (self.progress_max - self.progress_n) + + self.progress_n += 1 + + if time.time() > self.lastupdate + self.update_frequency: + self.update_frequency = 0.1 + if self.progress_max: + chars = self.bar_width * self.progress_n / self.progress_max + completed = f"{int(100*self.progress_n / self.progress_max): 3d}%" + else: + chars = self.progress_n / 9 + completed = " " + if chars >= self.bar_width: + chars = 0 + self.progress_n = 0 + + full_chars = int(chars) + part_char = max(0, min(8, int(9 * (chars % 1)))) + full_pad = self.bar_width - full_chars + + s = ( + (f"\r{completed}") + + (self._human_eta(elapsed) if self.bar_wide else "") + + full_chars * self.progress_chars[-1] + + self.progress_chars[part_char] + + " " * full_pad + + self._human_eta(self.eta) + ) + print(s, file=sys.stderr, end="") + sys.stderr.flush() + self.lastupdate = time.time() + + def finish(self): + if time.time() < self.started + self.update_frequency: + return + completed = 100 + elapsed = time.time() - self.started + full_pad = self.bar_width + s = ( + f"\r{completed: 3d}%" + + ("[ ]" if self.bar_wide else "") + + full_pad * self.progress_chars[-1] + + self._human_eta(elapsed) + ) + print(s, file=sys.stderr, end="\n") + + def _human_eta(self, eta): + + if eta is None: + return "" + t_m, t_s = divmod(int(eta), 60) + t_h, t_m = divmod(t_m, 60) + return f"[{t_h:02d}:{t_m:02d}:{t_s:02d}]" + + +def get_opts(): + """Returns command line arguments""" + parser = argparse.ArgumentParser( + description="Recursive gpg encryption. ", + epilog="Private key passphrase can be set to env variable GPGPASS, otherwise will prompt", + ) + parser.add_argument("--width", help="Console width in characters. Defaults to auto detect.") + parser.add_argument("--no-progress", help="Disable progress meter.", default=False, action="store_true") + parser.add_argument( + "--key", help="path to recipient public key for encryption or private key for decryption", required=True + ) + parser.add_argument("--name", help="Name of the owner of the key", required=False, default="recipient@address") + parser.add_argument( + "-r", + "--recursive", + default=False, + action="store_true", + help="Recursive encryption or decryption. Processes full folder structures", + ) + + parser.add_argument( + "command", + help="keygen/encrypt/decrypt. you can also use just the first letter.", + choices=["k", "e", "d", "keygen", "encrypt", "decrypt"], + ) + parser.add_argument("path", help="path to file to encrypt/decrypt, or folder if recursive", nargs="?") + parser.add_argument("out_path", help="output file, if not using recursive.", nargs="?") + parsed = parser.parse_args() + if parsed.out_path and parsed.recursive: + parser.error("Cannot assign output path, when in recursive mode") + for cmd in ("keygen", "encrypt", "decrypt"): + if parsed.command == cmd[0]: + parsed.command = cmd + return parsed + + +class Collector: + def __init__(self, filename, width, bufcount): + + self.progress = MiniProgress(bufcount, width) + self.out = open(filename, "wb") + + def __call__(self, chunk): + self.progress.next() + self.out.write(chunk) + if not chunk: + self.out.close() + self.progress.finish() + return False + + +class Processor: + def __init__(self): + self.opts = get_opts() + self.homedir = tempfile.TemporaryDirectory() + self.gpg = gnupg.GPG(gnupghome=self.homedir.name) + self.gpg.encoding = "utf-8" + self.suffix = ".gpg" + + if self.opts.command == "keygen": + self.keygen() + filelist = [] + + if self.opts.command in ("encrypt", "decrypt"): + import_result = self.gpg.import_keys_file(self.opts.key) + filelist = self.get_filelist(self.opts.path, self.opts.recursive, self.opts.command) + + if self.opts.command == "encrypt": + + keyid = import_result.fingerprints[0] + public_keys = self.gpg.list_keys() + for key in public_keys: + if key["fingerprint"] == keyid: + self.uid = key["uids"][0] + + if self.opts.command == "decrypt": + self.set_phrase() + n_f = len(filelist) + for i, f in enumerate(filelist): + print(f"{i}/{n_f} {f} {self.opts.command}ing", file=sys.stderr) + self.process_single(f) + + self.homedir.cleanup() + + def set_phrase(self, twice=False): + + self.phrase = os.environ.get("GPGPASS", None) + if self.phrase is None: + print("Type private key passphrase [Empty if no passphrase]") + self.phrase = getpass() + if twice: + print("Type private key passphrase again to confirm") + phrase2 = getpass() + if phrase2 != self.phrase: + print("Passphrases do not match!") + sys.exit(1) + + def get_filelist(self, root, recurse, direction): + if not recurse: + if os.path.isfile(root): + return [root] + raise ValueError("path is not a file, and no --recursive") + filelist = [] + for path, dirs, files in os.walk(root): + dirs.sort() + files.sort() + for f in files: + if direction == "encrypt" and f.endswith(self.suffix): + continue + if direction == "decrypt" and not f.endswith(self.suffix): + continue + filelist.append(os.path.join(path, f)) + return filelist + + def keygen(self): + if os.path.exists(self.opts.key): + print(f"File {self.opts.key} already exists") + sys.exit(1) + + self.set_phrase(twice=True) + input_data = self.gpg.gen_key_input( + key_type="RSA", + key_length=2048, + name_email=self.opts.name, + passphrase=self.phrase, + no_protection=self.phrase == "", + ) + key = self.gpg.gen_key(input_data) + + ascii_armored_public_keys = self.gpg.export_keys(key.fingerprint) + ascii_armored_private_keys = self.gpg.export_keys(key.fingerprint, True, passphrase=self.phrase) + + with open(self.opts.key + ".pub", "wt") as fp: + fp.write(ascii_armored_public_keys) + with open(self.opts.key, "wt") as fp: + fp.write(ascii_armored_private_keys) + os.chmod(self.opts.key, 0o600) + print(f"Generated {self.opts.key} and {self.opts.key}.pub", file=sys.stderr) + + def process_single(self, in_file): + + if self.opts.command == "encrypt": + auto_path = in_file + self.suffix + if self.opts.command == "decrypt": + auto_path = in_file[: -len(self.suffix)] + out_file = self.opts.out_path if self.opts.out_path else auto_path + if os.path.exists(out_file): + print(f"{out_file} already exists", file=sys.stderr) + return + + file_size = os.path.getsize(in_file) + self.gpg.on_data = Collector(out_file, self.opts.width, int(file_size / self.gpg.buffer_size)) + try: + with open(in_file, "rb") as fp: + if self.opts.command == "encrypt": + d = self.gpg.encrypt_file(fp, self.uid, always_trust=True, armor=False) + if self.opts.command == "decrypt": + d = self.gpg.decrypt_file(fp, passphrase=self.phrase, always_trust=True) + if not d.ok: + print(d.status, file=sys.stderr) + except KeyboardInterrupt: + if os.path.exists(out_file): + os.unlink(out_file) + print("Process cancelled", file=sys.stderr) + sys.exit(1) + + +def main(): + Processor()