From baff2a984155f0cd3384a2d2028930d651f2a34c Mon Sep 17 00:00:00 2001 From: Q Date: Sat, 15 Mar 2025 21:43:36 +0200 Subject: [PATCH] spiller --- py-packages/spiller/setup.py | 2 +- py-packages/spiller/spiller/__init__.py | 4 +- py-packages/spiller/spiller/spiller.py | 290 +++++++++++++----------- 3 files changed, 158 insertions(+), 138 deletions(-) diff --git a/py-packages/spiller/setup.py b/py-packages/spiller/setup.py index 5ae1a47..cc66d5c 100644 --- a/py-packages/spiller/setup.py +++ b/py-packages/spiller/setup.py @@ -1,5 +1,5 @@ -from distutils.core import setup import os +from distutils.core import setup def version_reader(path): diff --git a/py-packages/spiller/spiller/__init__.py b/py-packages/spiller/spiller/__init__.py index 0f9d4c6..2eb004a 100644 --- a/py-packages/spiller/spiller/__init__.py +++ b/py-packages/spiller/spiller/__init__.py @@ -1,2 +1,2 @@ -__version__ = "0.3" -from spiller.spiller import retrieve, store, list_storage +__version__ = "0.4" +from spiller.spiller import Spiller, decrypt, encrypt diff --git a/py-packages/spiller/spiller/spiller.py b/py-packages/spiller/spiller/spiller.py index 5d64755..fbcc764 100755 --- a/py-packages/spiller/spiller/spiller.py +++ b/py-packages/spiller/spiller/spiller.py @@ -1,26 +1,13 @@ #!/usr/bin/env python3 -import json -import sys import argparse +import json import os -import subprocess import random import stat import string - - -def get_config(): - default_config = { - "SPILLER_STORAGE": os.path.expanduser("~/.config/spiller/storage.json") - } - - try: - with open(os.path.expanduser("~/.config/spiller/config.json"), "rt") as fp: - default_config.update(json.load(fp)) - except Exception: - pass - return default_config +import subprocess +import sys def get_opts(): @@ -56,9 +43,7 @@ def get_opts(): type=argparse.FileType("r"), help="Read the data to store from a file. Must use this or [data]. Will strip newlines at the end.", ) - set_parser.add_argument( - "--plain", action="store_true", default=False, help="Do not encrypt" - ) + set_parser.add_argument("--plain", action="store_true", default=False, help="Do not encrypt") set_parser.add_argument( "--key", action="store", @@ -96,6 +81,8 @@ def get_opts(): help="Name of secret to delete", ) args = parser.parse_args() + if args.command is None: + raise parser.error("Command missing") try: if args.key_file: @@ -116,153 +103,186 @@ def get_opts(): return args -def list_storage(): - """ - Get list of keys in the secret storage +class Spiller: + def __init__(self, storage_path=None): + self.config = self.get_config() + if storage_path is None: + self.storage_path = os.getenv("SPILLER_STORAGE", self.config["SPILLER_STORAGE"]) + else: + self.storage_path = storage_path + self.load_storage() + self.verbose = False - Args: + def get_config(self): + default_config = {"SPILLER_STORAGE": os.path.expanduser("~/.config/spiller/storage.json")} - Returns: - List[List]: List of name and encryption method - """ + try: + with open(os.path.expanduser("~/.config/spiller/config.json"), "rt") as fp: + default_config.update(json.load(fp)) + except Exception: + pass + return default_config - storage = load_storage() - names = [] - for name in sorted(storage.keys()): - names.append([name, storage[name]["encryption"]]) - names.sort(key=lambda x: x[1]) - return names + def list_storage(self): + """ + Get list of keys in the secret storage + Args: -def load_storage(): - try: - with open(JSON, "rt") as fp: - return json.load(fp) - except FileNotFoundError: - return {} + Returns: + List[List]: List of name and encryption method + """ + names = [] + for name in sorted(self.storage.keys()): + names.append([name, self.storage[name]["encryption"]]) + names.sort(key=lambda x: x[1]) + return names -def save_storage(storage): - if not os.path.exists(JSON): - os.makedirs(os.path.dirname(JSON), exist_ok=True) - with open(JSON, "wt") as fp: - json.dump(storage, fp, indent=2) - try: - os.chmod(JSON, stat.S_IWUSR | stat.S_IREAD) - except Exception: - pass + def format_storage(self): + """Format storage listing for printing""" + names = self.list_storage() + names.insert(0, ["Name", "Encryption"]) + padlen = max([len(x[0]) for x in names]) + values = [("{:" + str(padlen) + "} {}").format(*row) for row in names] + return "\n".join(values) -def del_storage(name): - storage = load_storage() - del storage[name] - save_storage(storage) - print("Deleted " + name) + def load_storage(self): + try: + with open(self.storage_path, "rt") as fp: + self.storage = json.load(fp) + except FileNotFoundError: + self.storage = {} + def save_storage(self): + if not os.path.exists(self.storage_path): + os.makedirs(os.path.dirname(self.storage_path), exist_ok=True) + with open(self.storage_path, "wt") as fp: + json.dump(self.storage, fp, indent=2) + try: + os.chmod(self.storage_path, stat.S_IWUSR | stat.S_IREAD) + except Exception: + pass -def store(name, data, key, plain): - """ - Store key to secrets storage. + def del_storage(name): + """writes directly !""" - Args: - name (str): Name of the secret. - data (str): Data to encrypt. - key (str): Encryption key. If None, randomly generate the key - plain (bool): If set, stores the data as is, without encryption - Returns: - str: Key used to encrypt, useful if it was generated. - """ + del self.storage[name] + self.save_storage() + if self.verbose: + print("Deleted " + name) - entry = {"encryption": "gpg", "data": data} - storage = load_storage() - if plain: - entry["encryption"] = "none" - else: + def store(self, name, data, key, plain): + """ + Store key to secrets storage. + + Args: + name (str): Name of the secret. + data (str): Data to encrypt. + key (str): Encryption key. If None, randomly generate the key + plain (bool): If set, stores the data as is, without encryption + Returns: + str: Key used to encrypt, useful if it was generated. + """ + + entry = {"encryption": "gpg", "data": data} + if plain: + entry["encryption"] = "none" + else: + if key == None: + key = get_random_key() + if self.verbose: + print("Random key: " + key) + entry["data"], ec = self.encrypt(data, key) + if ec != 0: + raise ValueError("Encryption Failed") + self.storage[name] = entry + self.save_storage() + return key + + def retrieve(self, name, key=None): + """ + Retrieve a secret from storage + + Args: + name (str): Name of the secret. + key (str): Encryption key, if any required. + + Returns: + str: Decrypted secret + """ + + entry = self.storage[name] + ec = 0 + if entry["encryption"] == "none": + value = entry["data"] + else: + value, ec = self.decrypt(entry["data"], key) + if ec != 0: + raise ValueError("Decryption Failed") + return value + + def encrypt(self, data, key): + """Return encrypted message, and exit code. If != 0, encryption failed""" + p = subprocess.run( + ["gpg", "-a", "--symmetric", "--batch", "--passphrase-fd", "0"], + input=f"{key}\n{data}".encode(), + capture_output=True, + ) + encrypted = p.stdout.decode() + if encrypted == "": + if self.verbose: + print("Encrypt failed!", file=sys.stderr) + None, 1 + return encrypted, 0 + + def decrypt(self, encrypted, key): + """Return decrypted message, and exit code. If != 0, decryption failed""" if key == None: - key = get_random_key() - print("Random key: " + key) - entry["data"] = encrypt(data, key) + if self.verbose: + print("Requires --key!", file=sys.stderr) + return None, 1 - storage[name] = entry - save_storage(storage) - return key - - -def retrieve(name, key=None): - """ - Retrieve a secret from storage - - Args: - name (str): Name of the secret. - key (str): Encryption key, if any required. - - Returns: - str: Decrypted secret - """ - - storage = load_storage() - entry = storage[name] - if entry["encryption"] == "none": - return entry["data"] - return decrypt(entry["data"], key) + p = subprocess.run( + ["gpg", "-d", "--batch", "--passphrase-fd", "0"], + input=f"{key}\n{encrypted}".encode(), + capture_output=True, + ) + data = p.stdout.decode() + if data == "": + if self.verbose: + print("Decrypt failed!", file=sys.stderr) + return None, 1 + return data, 0 def encrypt(data, key): - p = subprocess.run( - ["gpg", "-a", "--symmetric", "--batch", "--passphrase-fd", "0"], - input=f"{key}\n{data}".encode(), - capture_output=True, - ) - encrypted = p.stdout.decode() - if encrypted == "": - print("Encrypt failed!", file=sys.stderr) - sys.exit(1) - return encrypted + spill = Spiller() + return spill.encrypt(data, key)[0] def decrypt(encrypted, key): - if key == None: - print("Requires --key!", file=sys.stderr) - sys.exit(1) - - p = subprocess.run( - ["gpg", "-d", "--batch", "--passphrase-fd", "0"], - input=f"{key}\n{encrypted}".encode(), - capture_output=True, - ) - data = p.stdout.decode() - if data == "": - print("Decrypt failed!", file=sys.stderr) - sys.exit(1) - return data + spill = Spiller() + return spill.decrypt(encrypted, key)[0] def get_random_key(): return "-".join( - [ - "".join( - [random.choice(string.ascii_letters + string.digits) for x in range(8)] - ) - for x in range(5) - ] + ["".join([random.choice(string.ascii_letters + string.digits) for x in range(8)]) for x in range(5)] ) -CONFIG = get_config() -JSON = os.getenv("SPILLER_STORAGE", CONFIG["SPILLER_STORAGE"]) - - def main(): opts = get_opts() + spill = Spiller() + spill.verbose = True + if opts.command == "set": - store(opts.name, opts.data, opts.key, opts.plain) + spill.store(opts.name, opts.data, opts.key, opts.plain) if opts.command == "get": - print(retrieve(opts.name, opts.key)) + print(spill.retrieve(opts.name, opts.key)) if opts.command == "list": - names = list_storage() - names.insert(0, ["Name", "Encryption"]) - padlen = max([len(x[0]) for x in names]) - for row in names: - print(("{:" + str(padlen) + "} {}").format(*row)) + print(spill.format_storage()) if opts.command == "del": - del_storage(opts.name) + spill.del_storage(opts.name)