183 lines
5.3 KiB
Python
Executable File
183 lines
5.3 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
import json
|
|
import os
|
|
import random
|
|
import stat
|
|
import string
|
|
import subprocess
|
|
import sys
|
|
|
|
DEFAULT_CONFIG = os.path.expanduser("~/.config/spiller/config.json")
|
|
DEFAULT_STORAGE = os.path.expanduser("~/.config/spiller/storage.json")
|
|
|
|
|
|
class Spiller:
|
|
def __init__(self, storage_path=None):
|
|
self.config = self.get_config()
|
|
if storage_path is None:
|
|
self.storage_path = os.getenv("SPILLER_STORAGE", self.config["SPILLER_STORAGE"])
|
|
else:
|
|
self.storage_path = storage_path
|
|
self.load_storage()
|
|
self.verbose = False
|
|
|
|
def get_config(self):
|
|
default_config = {"SPILLER_STORAGE": DEFAULT_STORAGE}
|
|
|
|
try:
|
|
with open(DEFAULT_CONFIG, "rt") as fp:
|
|
default_config.update(json.load(fp))
|
|
except Exception:
|
|
pass
|
|
return default_config
|
|
|
|
def list_storage(self):
|
|
"""
|
|
Get list of keys in the secret storage
|
|
|
|
Args:
|
|
|
|
Returns:
|
|
List[List]: List of name and encryption method
|
|
"""
|
|
|
|
names = []
|
|
for name in sorted(self.storage.keys()):
|
|
names.append([name, self.storage[name]["encryption"]])
|
|
names.sort(key=lambda x: x[1])
|
|
return names
|
|
|
|
def format_storage(self):
|
|
"""Format storage listing for printing"""
|
|
|
|
names = self.list_storage()
|
|
names.insert(0, ["Name", "Encryption"])
|
|
padlen = max([len(x[0]) for x in names])
|
|
values = [("{:" + str(padlen) + "} {}").format(*row) for row in names]
|
|
return "\n".join(values)
|
|
|
|
def load_storage(self):
|
|
try:
|
|
with open(self.storage_path, "rt") as fp:
|
|
self.storage = json.load(fp)
|
|
except FileNotFoundError:
|
|
self.storage = {}
|
|
|
|
def save_storage(self):
|
|
if not os.path.exists(self.storage_path):
|
|
os.makedirs(os.path.dirname(self.storage_path), exist_ok=True)
|
|
with open(self.storage_path, "wt") as fp:
|
|
json.dump(self.storage, fp, indent=2)
|
|
try:
|
|
os.chmod(self.storage_path, stat.S_IWUSR | stat.S_IREAD)
|
|
except Exception:
|
|
pass
|
|
|
|
def del_storage(self, name):
|
|
"""writes directly !"""
|
|
|
|
del self.storage[name]
|
|
self.save_storage()
|
|
if self.verbose:
|
|
print("Deleted " + name, file=sys.stderr)
|
|
|
|
def store(self, name, data, key, plain):
|
|
"""
|
|
Store key to secrets storage.
|
|
|
|
Args:
|
|
name (str): Name of the secret.
|
|
data (str): Data to encrypt.
|
|
key (str): Encryption key. If None, randomly generate the key
|
|
plain (bool): If set, stores the data as is, without encryption
|
|
Returns:
|
|
str: Key used to encrypt, useful if it was generated.
|
|
"""
|
|
|
|
entry = {"encryption": "gpg", "data": data}
|
|
if plain:
|
|
entry["encryption"] = "none"
|
|
else:
|
|
if key == None:
|
|
key = get_random_key()
|
|
if self.verbose:
|
|
print("Random key: " + key, file=sys.stderr)
|
|
entry["data"], ec = self.encrypt(data, key)
|
|
if ec != 0:
|
|
raise ValueError("Encryption Failed")
|
|
self.storage[name] = entry
|
|
self.save_storage()
|
|
return key
|
|
|
|
def retrieve(self, name, key=None):
|
|
"""
|
|
Retrieve a secret from storage
|
|
|
|
Args:
|
|
name (str): Name of the secret.
|
|
key (str): Encryption key, if any required.
|
|
|
|
Returns:
|
|
str: Decrypted secret
|
|
"""
|
|
|
|
entry = self.storage[name]
|
|
ec = 0
|
|
if entry["encryption"] == "none":
|
|
value = entry["data"]
|
|
else:
|
|
value, ec = self.decrypt(entry["data"], key)
|
|
if ec != 0:
|
|
raise ValueError("Decryption Failed")
|
|
return value
|
|
|
|
def encrypt(self, data, key):
|
|
"""Return encrypted message, and exit code. If != 0, encryption failed"""
|
|
p = subprocess.run(
|
|
["gpg", "-a", "--symmetric", "--batch", "--passphrase-fd", "0"],
|
|
input=f"{key}\n{data}".encode(),
|
|
capture_output=True,
|
|
)
|
|
encrypted = p.stdout.decode()
|
|
if encrypted == "":
|
|
if self.verbose:
|
|
print("Encrypt failed!", file=sys.stderr)
|
|
return None, 1
|
|
return encrypted, 0
|
|
|
|
def decrypt(self, encrypted, key):
|
|
"""Return decrypted message, and exit code. If != 0, decryption failed"""
|
|
if key == None:
|
|
if self.verbose:
|
|
print("Requires --key!", file=sys.stderr)
|
|
return None, 1
|
|
|
|
p = subprocess.run(
|
|
["gpg", "-d", "--batch", "--passphrase-fd", "0"],
|
|
input=f"{key}\n{encrypted}".encode(),
|
|
capture_output=True,
|
|
)
|
|
data = p.stdout.decode()
|
|
if data == "":
|
|
if self.verbose:
|
|
print("Decrypt failed!", file=sys.stderr)
|
|
return None, 1
|
|
return data, 0
|
|
|
|
|
|
def encrypt(data, key):
|
|
spill = Spiller()
|
|
return spill.encrypt(data, key)[0]
|
|
|
|
|
|
def decrypt(encrypted, key):
|
|
spill = Spiller()
|
|
return spill.decrypt(encrypted, key)[0]
|
|
|
|
|
|
def get_random_key():
|
|
return "-".join(
|
|
["".join([random.choice(string.ascii_letters + string.digits) for x in range(8)]) for x in range(5)]
|
|
)
|