This commit is contained in:
Q
2025-03-15 21:43:36 +02:00
parent 779f22f60b
commit baff2a9841
3 changed files with 158 additions and 138 deletions

View File

@@ -1,5 +1,5 @@
from distutils.core import setup
import os import os
from distutils.core import setup
def version_reader(path): def version_reader(path):

View File

@@ -1,2 +1,2 @@
__version__ = "0.3" __version__ = "0.4"
from spiller.spiller import retrieve, store, list_storage from spiller.spiller import Spiller, decrypt, encrypt

View File

@@ -1,26 +1,13 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import json
import sys
import argparse import argparse
import json
import os import os
import subprocess
import random import random
import stat import stat
import string import string
import subprocess
import sys
def get_config():
default_config = {
"SPILLER_STORAGE": os.path.expanduser("~/.config/spiller/storage.json")
}
try:
with open(os.path.expanduser("~/.config/spiller/config.json"), "rt") as fp:
default_config.update(json.load(fp))
except Exception:
pass
return default_config
def get_opts(): def get_opts():
@@ -56,9 +43,7 @@ def get_opts():
type=argparse.FileType("r"), type=argparse.FileType("r"),
help="Read the data to store from a file. Must use this or [data]. Will strip newlines at the end.", help="Read the data to store from a file. Must use this or [data]. Will strip newlines at the end.",
) )
set_parser.add_argument( set_parser.add_argument("--plain", action="store_true", default=False, help="Do not encrypt")
"--plain", action="store_true", default=False, help="Do not encrypt"
)
set_parser.add_argument( set_parser.add_argument(
"--key", "--key",
action="store", action="store",
@@ -96,6 +81,8 @@ def get_opts():
help="Name of secret to delete", help="Name of secret to delete",
) )
args = parser.parse_args() args = parser.parse_args()
if args.command is None:
raise parser.error("Command missing")
try: try:
if args.key_file: if args.key_file:
@@ -116,153 +103,186 @@ def get_opts():
return args return args
def list_storage(): class Spiller:
""" def __init__(self, storage_path=None):
Get list of keys in the secret storage self.config = self.get_config()
if storage_path is None:
self.storage_path = os.getenv("SPILLER_STORAGE", self.config["SPILLER_STORAGE"])
else:
self.storage_path = storage_path
self.load_storage()
self.verbose = False
Args: def get_config(self):
default_config = {"SPILLER_STORAGE": os.path.expanduser("~/.config/spiller/storage.json")}
Returns: try:
List[List]: List of name and encryption method with open(os.path.expanduser("~/.config/spiller/config.json"), "rt") as fp:
""" default_config.update(json.load(fp))
except Exception:
pass
return default_config
storage = load_storage() def list_storage(self):
names = [] """
for name in sorted(storage.keys()): Get list of keys in the secret storage
names.append([name, storage[name]["encryption"]])
names.sort(key=lambda x: x[1])
return names
Args:
def load_storage(): Returns:
try: List[List]: List of name and encryption method
with open(JSON, "rt") as fp: """
return json.load(fp)
except FileNotFoundError:
return {}
names = []
for name in sorted(self.storage.keys()):
names.append([name, self.storage[name]["encryption"]])
names.sort(key=lambda x: x[1])
return names
def save_storage(storage): def format_storage(self):
if not os.path.exists(JSON): """Format storage listing for printing"""
os.makedirs(os.path.dirname(JSON), exist_ok=True)
with open(JSON, "wt") as fp:
json.dump(storage, fp, indent=2)
try:
os.chmod(JSON, stat.S_IWUSR | stat.S_IREAD)
except Exception:
pass
names = self.list_storage()
names.insert(0, ["Name", "Encryption"])
padlen = max([len(x[0]) for x in names])
values = [("{:" + str(padlen) + "} {}").format(*row) for row in names]
return "\n".join(values)
def del_storage(name): def load_storage(self):
storage = load_storage() try:
del storage[name] with open(self.storage_path, "rt") as fp:
save_storage(storage) self.storage = json.load(fp)
print("Deleted " + name) except FileNotFoundError:
self.storage = {}
def save_storage(self):
if not os.path.exists(self.storage_path):
os.makedirs(os.path.dirname(self.storage_path), exist_ok=True)
with open(self.storage_path, "wt") as fp:
json.dump(self.storage, fp, indent=2)
try:
os.chmod(self.storage_path, stat.S_IWUSR | stat.S_IREAD)
except Exception:
pass
def store(name, data, key, plain): def del_storage(name):
""" """writes directly !"""
Store key to secrets storage.
Args: del self.storage[name]
name (str): Name of the secret. self.save_storage()
data (str): Data to encrypt. if self.verbose:
key (str): Encryption key. If None, randomly generate the key print("Deleted " + name)
plain (bool): If set, stores the data as is, without encryption
Returns:
str: Key used to encrypt, useful if it was generated.
"""
entry = {"encryption": "gpg", "data": data} def store(self, name, data, key, plain):
storage = load_storage() """
if plain: Store key to secrets storage.
entry["encryption"] = "none"
else: Args:
name (str): Name of the secret.
data (str): Data to encrypt.
key (str): Encryption key. If None, randomly generate the key
plain (bool): If set, stores the data as is, without encryption
Returns:
str: Key used to encrypt, useful if it was generated.
"""
entry = {"encryption": "gpg", "data": data}
if plain:
entry["encryption"] = "none"
else:
if key == None:
key = get_random_key()
if self.verbose:
print("Random key: " + key)
entry["data"], ec = self.encrypt(data, key)
if ec != 0:
raise ValueError("Encryption Failed")
self.storage[name] = entry
self.save_storage()
return key
def retrieve(self, name, key=None):
"""
Retrieve a secret from storage
Args:
name (str): Name of the secret.
key (str): Encryption key, if any required.
Returns:
str: Decrypted secret
"""
entry = self.storage[name]
ec = 0
if entry["encryption"] == "none":
value = entry["data"]
else:
value, ec = self.decrypt(entry["data"], key)
if ec != 0:
raise ValueError("Decryption Failed")
return value
def encrypt(self, data, key):
"""Return encrypted message, and exit code. If != 0, encryption failed"""
p = subprocess.run(
["gpg", "-a", "--symmetric", "--batch", "--passphrase-fd", "0"],
input=f"{key}\n{data}".encode(),
capture_output=True,
)
encrypted = p.stdout.decode()
if encrypted == "":
if self.verbose:
print("Encrypt failed!", file=sys.stderr)
None, 1
return encrypted, 0
def decrypt(self, encrypted, key):
"""Return decrypted message, and exit code. If != 0, decryption failed"""
if key == None: if key == None:
key = get_random_key() if self.verbose:
print("Random key: " + key) print("Requires --key!", file=sys.stderr)
entry["data"] = encrypt(data, key) return None, 1
storage[name] = entry p = subprocess.run(
save_storage(storage) ["gpg", "-d", "--batch", "--passphrase-fd", "0"],
return key input=f"{key}\n{encrypted}".encode(),
capture_output=True,
)
def retrieve(name, key=None): data = p.stdout.decode()
""" if data == "":
Retrieve a secret from storage if self.verbose:
print("Decrypt failed!", file=sys.stderr)
Args: return None, 1
name (str): Name of the secret. return data, 0
key (str): Encryption key, if any required.
Returns:
str: Decrypted secret
"""
storage = load_storage()
entry = storage[name]
if entry["encryption"] == "none":
return entry["data"]
return decrypt(entry["data"], key)
def encrypt(data, key): def encrypt(data, key):
p = subprocess.run( spill = Spiller()
["gpg", "-a", "--symmetric", "--batch", "--passphrase-fd", "0"], return spill.encrypt(data, key)[0]
input=f"{key}\n{data}".encode(),
capture_output=True,
)
encrypted = p.stdout.decode()
if encrypted == "":
print("Encrypt failed!", file=sys.stderr)
sys.exit(1)
return encrypted
def decrypt(encrypted, key): def decrypt(encrypted, key):
if key == None: spill = Spiller()
print("Requires --key!", file=sys.stderr) return spill.decrypt(encrypted, key)[0]
sys.exit(1)
p = subprocess.run(
["gpg", "-d", "--batch", "--passphrase-fd", "0"],
input=f"{key}\n{encrypted}".encode(),
capture_output=True,
)
data = p.stdout.decode()
if data == "":
print("Decrypt failed!", file=sys.stderr)
sys.exit(1)
return data
def get_random_key(): def get_random_key():
return "-".join( return "-".join(
[ ["".join([random.choice(string.ascii_letters + string.digits) for x in range(8)]) for x in range(5)]
"".join(
[random.choice(string.ascii_letters + string.digits) for x in range(8)]
)
for x in range(5)
]
) )
CONFIG = get_config()
JSON = os.getenv("SPILLER_STORAGE", CONFIG["SPILLER_STORAGE"])
def main(): def main():
opts = get_opts() opts = get_opts()
spill = Spiller()
spill.verbose = True
if opts.command == "set": if opts.command == "set":
store(opts.name, opts.data, opts.key, opts.plain) spill.store(opts.name, opts.data, opts.key, opts.plain)
if opts.command == "get": if opts.command == "get":
print(retrieve(opts.name, opts.key)) print(spill.retrieve(opts.name, opts.key))
if opts.command == "list": if opts.command == "list":
names = list_storage() print(spill.format_storage())
names.insert(0, ["Name", "Encryption"])
padlen = max([len(x[0]) for x in names])
for row in names:
print(("{:" + str(padlen) + "} {}").format(*row))
if opts.command == "del": if opts.command == "del":
del_storage(opts.name) spill.del_storage(opts.name)