Files
qgpg/qgpg/__init__.py
2024-06-26 10:19:43 +03:00

293 lines
10 KiB
Python

#!/usr/bin/env python3
__version__ = "1.0"
import argparse
import os
import shutil
import sys
import tempfile
import time
from getpass import getpass
try:
import gnupg
except Exception:
raise ImportError("You need to install python package python-gnupg")
chunk_size = 1024 * 1024 * 1 # = 1 Mb
class MiniProgress:
"""
Displays a progress bar with every .next() iteration. Starts the timer when instance created
Call .finish() at the end.
"""
def __init__(self, maxcount=None, width=None):
self.progress_chars = " ▏▎▍▌▋▊▉█"
self.progress_n = 0
self.progress_max = maxcount
self.lastupdate = time.time()
self.started = time.time()
self.eta = None
self.width = max(20, width) if width else max(20, shutil.get_terminal_size((80, 20)).columns - 2)
self.bar_width = self.width - 15
self.bar_wide = False
self.update_frequency = 1.5 # shortened later
if self.width > 80:
self.bar_wide = True
self.bar_width = self.width - 25
def next(self):
elapsed = time.time() - self.started
if self.progress_n > 0 and self.progress_max and self.progress_n < self.progress_max:
self.eta = elapsed / self.progress_n * (self.progress_max - self.progress_n)
self.progress_n += 1
if time.time() > self.lastupdate + self.update_frequency:
self.update_frequency = 0.1
if self.progress_max:
chars = self.bar_width * self.progress_n / self.progress_max
completed = f"{int(100*self.progress_n / self.progress_max): 3d}%"
else:
chars = self.progress_n / 9
completed = " "
if chars >= self.bar_width:
chars = 0
self.progress_n = 0
full_chars = int(chars)
part_char = max(0, min(8, int(9 * (chars % 1))))
full_pad = self.bar_width - full_chars
s = (
(f"\r{completed}")
+ (self._human_eta(elapsed) if self.bar_wide else "")
+ full_chars * self.progress_chars[-1]
+ self.progress_chars[part_char]
+ " " * full_pad
+ self._human_eta(self.eta)
)
print(s, file=sys.stderr, end="")
sys.stderr.flush()
self.lastupdate = time.time()
def finish(self):
if time.time() < self.started + self.update_frequency:
return
elapsed = time.time() - self.started
full_pad = self.bar_width + 1
s = (
"\r100%"
+ ("[ ]" if self.bar_wide else "")
+ full_pad * self.progress_chars[-1]
+ self._human_eta(elapsed)
)
print(s, file=sys.stderr, end="\n")
def _human_eta(self, eta):
if eta is None:
return ""
t_m, t_s = divmod(int(eta), 60)
t_h, t_m = divmod(t_m, 60)
return f"[{t_h:02d}:{t_m:02d}:{t_s:02d}]"
def get_opts():
"""Returns command line arguments"""
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description="Wrapper to GPG, allowing recursive encryption.",
epilog="""Private key passphrase can be set to env variable GPGPASS, otherwise will prompt
Usage
=====
- Generate keys:
qgpg --key ./path/mykey keygen
- This will create private and public keys, mykey and mykey.pub
- Share the public key to someone who will encrypt data to you
- Encrypt file:
qgpg --key ./mykey.pub encrypt ./plain_file.txt ./encrypted_file.txt.gpg
- Encrypt all files in a folder:
qgpg --key ./mykey.pub -r encrypt ./folder/
- Decrypt all files with the private key:
qgpg --key ./mykey -r decrypt ./encrypted_folder/
- Using passhprase in a variable:
GPGPASS=mysecretpassword qgpg --key ./mykey dectypt file.gpg file.txt
""",
)
parser.add_argument("--width", help="Console width in characters. Defaults to auto detect.")
parser.add_argument("--no-progress", help="Disable progress meter.", default=False, action="store_true")
parser.add_argument(
"--key", help="path to recipient public key for encryption or private key for decryption", required=True
)
parser.add_argument("--name", help="Name of the owner of the key", required=False, default="recipient@address")
parser.add_argument(
"-r",
"--recursive",
default=False,
action="store_true",
help="Recursive encryption or decryption. Processes full folder structures",
)
parser.add_argument(
"command",
help="keygen/encrypt/decrypt. you can also use just the first letter.",
choices=["k", "e", "d", "keygen", "encrypt", "decrypt"],
)
parser.add_argument("path", help="path to file to encrypt/decrypt, or folder if recursive", nargs="?")
parser.add_argument("out_path", help="output file, if not using recursive.", nargs="?")
parsed = parser.parse_args()
if parsed.out_path and parsed.recursive:
parser.error("Cannot assign output path, when in recursive mode")
for cmd in ("keygen", "encrypt", "decrypt"):
if parsed.command == cmd[0]:
parsed.command = cmd
return parsed
class Collector:
def __init__(self, filename, width, bufcount):
self.progress = MiniProgress(bufcount, width)
self.out = open(filename, "wb")
def __call__(self, chunk):
self.progress.next()
self.out.write(chunk)
if not chunk:
self.out.close()
self.progress.finish()
return False
class Processor:
def __init__(self):
self.opts = get_opts()
self.homedir = tempfile.TemporaryDirectory()
self.gpg = gnupg.GPG(gnupghome=self.homedir.name)
self.gpg.encoding = "utf-8"
self.suffix = ".gpg"
if self.opts.command == "keygen":
self.keygen()
filelist = []
if self.opts.command in ("encrypt", "decrypt"):
import_result = self.gpg.import_keys_file(self.opts.key)
filelist = self.get_filelist(self.opts.path, self.opts.recursive, self.opts.command)
if self.opts.command == "encrypt":
keyid = import_result.fingerprints[0]
public_keys = self.gpg.list_keys()
for key in public_keys:
if key["fingerprint"] == keyid:
self.uid = key["uids"][0]
if self.opts.command == "decrypt":
self.set_phrase()
n_f = len(filelist)
for i, f in enumerate(filelist):
print(f"{i+1}/{n_f} {self.opts.command}ing {f} ", file=sys.stderr)
self.process_single(f)
self.homedir.cleanup()
def set_phrase(self, twice=False):
self.phrase = os.environ.get("GPGPASS", None)
if self.phrase is None:
print("Type private key passphrase [Empty if no passphrase]")
self.phrase = getpass()
if twice:
print("Type private key passphrase again to confirm")
phrase2 = getpass()
if phrase2 != self.phrase:
print("Passphrases do not match!")
sys.exit(1)
def get_filelist(self, root, recurse, direction):
if not recurse:
if os.path.isfile(root):
return [root]
raise ValueError("path is not a file, and no --recursive")
filelist = []
for path, dirs, files in os.walk(root):
dirs.sort()
files.sort()
for f in files:
if direction == "encrypt" and f.endswith(self.suffix):
continue
if direction == "decrypt" and not f.endswith(self.suffix):
continue
filelist.append(os.path.join(path, f))
return filelist
def keygen(self):
if os.path.exists(self.opts.key):
print(f"File {self.opts.key} already exists")
sys.exit(1)
self.set_phrase(twice=True)
input_data = self.gpg.gen_key_input(
key_type="RSA",
key_length=2048,
name_email=self.opts.name,
passphrase=self.phrase,
no_protection=self.phrase == "",
)
key = self.gpg.gen_key(input_data)
ascii_armored_public_keys = self.gpg.export_keys(key.fingerprint)
ascii_armored_private_keys = self.gpg.export_keys(key.fingerprint, True, passphrase=self.phrase)
with open(self.opts.key + ".pub", "wt") as fp:
fp.write(ascii_armored_public_keys)
with open(self.opts.key, "wt") as fp:
fp.write(ascii_armored_private_keys)
os.chmod(self.opts.key, 0o600)
print(f"Generated {self.opts.key} and {self.opts.key}.pub", file=sys.stderr)
def process_single(self, in_file):
if self.opts.command == "encrypt":
auto_path = in_file + self.suffix
if self.opts.command == "decrypt":
auto_path = in_file[: -len(self.suffix)]
out_file = self.opts.out_path if self.opts.out_path else auto_path
if os.path.exists(out_file):
print(f"{out_file} already exists", file=sys.stderr)
return
file_size = os.path.getsize(in_file)
if not self.opts.no_progress:
self.gpg.on_data = Collector(out_file, self.opts.width, int(file_size / self.gpg.buffer_size))
try:
with open(in_file, "rb") as fp:
if self.opts.command == "encrypt":
d = self.gpg.encrypt_file(fp, self.uid, always_trust=True, armor=False)
if self.opts.command == "decrypt":
d = self.gpg.decrypt_file(fp, passphrase=self.phrase, always_trust=True)
if not d.ok:
print(d.status, file=sys.stderr)
except KeyboardInterrupt:
if os.path.exists(out_file):
os.unlink(out_file)
print("Process cancelled", file=sys.stderr)
sys.exit(1)
def main():
Processor()
if __name__ == "__main__":
main()