From a2812fc00a02b38581755ac2f41dd0f7fcc42fc6 Mon Sep 17 00:00:00 2001 From: q Date: Mon, 2 Dec 2024 22:35:55 +0200 Subject: [PATCH] rewrite in object mode --- flit/flit.py | 557 +++++++++++++++++++++++++-------------------------- 1 file changed, 272 insertions(+), 285 deletions(-) diff --git a/flit/flit.py b/flit/flit.py index 35a5d4a..0ba2911 100755 --- a/flit/flit.py +++ b/flit/flit.py @@ -15,17 +15,280 @@ and contents are deleted after expiration date. """ -__version__ = "20241202.0" +__version__ = "20241202.1" GLOBAL_CONFIG = os.path.expanduser("~/.config/flit/config.json") CONFIG = ".flit.json" +class Flit: + def __init__(self): + try: + self.global_config = read_default_global_config() + except FileNotFoundError: + write_default_global_config() + self.global_config = read_default_global_config() + self.config_base = self.global_config.get("CONFIG_BASE", CONFIG) + self.opts = parse_opts(self.global_config) + + if self.opts.command == "add": + """Add a new share""" + + new_name = self.random_name() + self.create_new(new_name, self.opts.days, self.opts.description) + print(os.path.join(self.opts.home, new_name)) + print("") + self.copy_files(new_name, self.opts.files) + self.list_folders(verbose=True, filter_name=new_name) + + if self.opts.command == "list" or self.opts.command is None: + """List shares""" + + try: + name = self.opts.name + except AttributeError: + name = None + self.list_folders(verbose=self.opts.verbose, filter_name=name) + + if self.opts.command == "del": + """Delete due shares""" + + self.del_due_folders() + + def random_name(self): + """Random unique name for a new share. Share names follow the syntax [NUM-ABC-DEF], + Where NUM is a 3 digit growing number (to help navigating new folders), but the + rest are random, to ensure secrecy. + + Args: + None + + Returns: + str: New folder name + + """ + while True: + existing_names = [c["name"] for c in self.get_folders()] + existing_numbers = [tryint(num[0:3]) for num in existing_names if tryint(num[0:3]) != None] + if len(existing_numbers) == 0: + index = 0 + else: + index = max(existing_numbers) + 1 + + name = "{:03d}-{}-{}".format( + index, + "".join([random_char() for x in range(3)]), + "".join([random_char() for x in range(3)]), + ) + if not os.path.exists(os.path.join(self.opts.home, name)): + break + return name + + def create_new(self, name, days, description): + """Creates a new folder, and stores configuration + + Args: + name (str): Share name + days (int): Days to store the folder + description (str): Description of the fileshare + + Returns: + None + + """ + os.mkdir(os.path.join(self.opts.home, name)) + now = datetime.now() + del_delta = timedelta(days=days) + del_time = now + del_delta + config = {"description": description, "delete_time": del_time.isoformat(), "created": now.isoformat()} + self.write_config(name, config) + + def get_stats(self, name): + """Get share properties + + Args: + name (str): Name of the share + + Returns: + Dict: Share properties + + """ + + config = self.read_config(name) + + del_time = datetime.fromisoformat(config["delete_time"]) + now = datetime.now() + mtime = datetime.fromisoformat(config["created"]) + + to_del_time = del_time - now + is_due = now > del_time + config["delete_time"] = del_time + config["created"] = mtime + config["name"] = name + config["to_deletion"] = to_del_time + config["due"] = is_due + + return config + + def del_due_folders(self): + """Delete folders where due date has passed""" + folders = self.get_folders() + for c in folders: + if c["due"]: + print("Deleting {}".format(c["name"])) + shutil.rmtree(os.path.join(self.opts.home, c["name"])) + + def get_folders(self): + """Get share properties for all the shares + + Args: + None: + + Returns: + List: List that contain `get_stats(p)` for every share + + """ + + dir_list = [ + p for p in os.listdir(self.opts.home) if os.path.exists(os.path.join(self.opts.home, p, self.config_base)) + ] + configs = [self.get_stats(p) for p in dir_list] + configs.sort(key=lambda c: c["created"], reverse=True) + return configs + + def copy_files(self, new_name, files): + """Copy files to a new share + + Args: + home (str): Flit home + new_name (str): Name of the new share + files (list): List of paths to copy + + Returns: + None + + """ + target = os.path.abspath(os.path.join(self.opts.home, new_name)) + for f in files: + print(f"Copying: {f}") + source = f + if source.endswith("/"): + source = source[0:-1] + base = os.path.basename(source) + if not os.path.exists(source): + print("Source path does not exist") + continue + if os.path.isfile(source): + shutil.copy2(source, target) + else: + shutil.copytree( + source, + os.path.join(target, base), + symlinks=True, + ) + + def list_folders(self, verbose=False, filter_name=None): + """Show folder list + + Args: + root_url (str): URL to root of the share system + verbose (bool): If set true, displays folder contents + filter_name (str): If set, display only one folder matching the name + + Returns: + None + + """ + + folders = self.get_folders() + header_format = "{:" + str(12 + len(self.opts.root)) + "} {} {} {} {}" + print(header_format.format("URL", "Created", "ToDelete", "InDays", "Description")) + for c in folders: + if filter_name is not None: + if filter_name != c["name"]: + continue + due = "*" if c["due"] else " " + print( + "{}/{}/ {} {} {: 4d}d{} {}".format( + self.opts.root, + c["name"], + c["created"].isoformat()[0:10], + c["delete_time"].isoformat()[0:10], + c["to_deletion"].days, + due, + c["description"], + ) + ) + if verbose: + sub_files = self.get_sub_files(c["name"]) + for sp in sub_files: + print(" {}/{}/{}".format(self.opts.root, c["name"], quote(sp, "/"))) + + def get_sub_files(self, name): + """Get 1st level files in a share + + Args: + name (str): Name of the share + + Returns: + List: Folders and Files in the share + + """ + + file_list = sorted([x for x in os.listdir(os.path.join(self.opts.home, name)) if not x.startswith(".")]) + dir_list = [x + "/" for x in file_list if os.path.isdir(os.path.join(self.opts.home, name, x))] + file_list = [x for x in file_list if os.path.isfile(os.path.join(self.opts.home, name, x))] + return dir_list + file_list + + def read_config(self, name): + """Read the share configuration file + + Args: + name (str): Share name + + Returns: + Dict: Share config + + Example: + { + "created": "2020-12-28T23:04:07.275200", + "delete_time": "2020-12-29T23:04:07.275200", + "description": "Test share" + } + + """ + with open(os.path.join(self.opts.home, name, self.config_base), "rt") as fp: + return json.load(fp) + + def write_config(self, name, config): + """Write the share configuration file + + Args: + name (str): Share name + config (Dict): Share configuration + + Returns: + None + """ + + with open(os.path.join(self.opts.home, name, self.config_base), "wt") as fp: + json.dump(config, fp, indent=2, sort_keys=True) + + def write_default_global_config(): os.makedirs(os.path.dirname(GLOBAL_CONFIG), exist_ok=True) with open(os.path.join(GLOBAL_CONFIG), "wt") as fp: - json.dump({"ROOT_URL": "https://my.website.net", "HOME_DIR": os.getcwd()}, fp, indent=2, sort_keys=True) + json.dump( + { + "ROOT_URL": "https://my.website.net", + "HOME_DIR": os.getcwd(), + "CONFIG_BASE": f".flit.{''.join([random_char() for x in range(3)])}.json", + }, + fp, + indent=2, + sort_keys=True, + ) print(f"New template config written. See '{GLOBAL_CONFIG}'") @@ -71,6 +334,7 @@ def parse_opts(global_config): default=False, help="Increase verbosity", ) + parser.add_argument("--version", action="version", version="%(prog)s {version}".format(version=__version__)) parser.add_argument( "--root", @@ -136,7 +400,11 @@ def parse_opts(global_config): ) del_parser = subparsers.add_parser("del", add_help=False) - return parser.parse_args() + parsed = parser.parse_args() + if not os.path.isdir(parsed.home): + print("Home folder must exist", file=sys.stderr) + sys.exit(1) + return parsed def random_char(): @@ -152,248 +420,6 @@ def random_char(): return random.choice(string.ascii_lowercase + string.digits) -def random_name(): - """Random unique name for a new share. Share names follow the syntax [NUM-ABC-DEF], - Where NUM is a 3 digit growing number (to help navigating new folders), but the - rest are random, to ensure secrecy. - - Args: - None - - Returns: - str: New folder name - - """ - while True: - existing_names = [c["name"] for c in get_folders()] - existing_numbers = [tryint(num[0:3]) for num in existing_names if tryint(num[0:3]) != None] - if len(existing_numbers) == 0: - index = 0 - else: - index = max(existing_numbers) + 1 - if index > 999: - for index in range(1000): - if not index in existing_numbers: - break - - name = "{:03d}-{}-{}".format( - index, - "".join([random_char() for x in range(3)]), - "".join([random_char() for x in range(3)]), - ) - if not os.path.exists(name): - break - return name - - -def create_new(p, days, description): - """Creates a new folder, and stores configuration - - Args: - p (str): Share name - days (int): Days to store the folder - description (str): Description of the fileshare - - Returns: - None - - """ - os.mkdir(p) - now = datetime.now() - del_delta = timedelta(days=days) - del_time = now + del_delta - config = {} - config["description"] = description - config["delete_time"] = del_time.isoformat() - config["created"] = now.isoformat() - write_config(p, config) - - -def copy_files(cwd, new_name, files): - """Copy files to a new share - - Args: - c (str): Working directory when the script was started - new_name (str): Name of the new share - files (list): List of paths to copy - - Returns: - None - - """ - target = os.path.abspath(new_name) - for f in files: - print(f"Copying: {f}") - source = os.path.join(cwd, f) - if source.endswith("/"): - source = source[0:-1] - base = os.path.basename(source) - if not os.path.exists(source): - print("Path does not exist") - continue - if os.path.isfile(source): - shutil.copy2(source, target) - else: - shutil.copytree( - source, - os.path.join(target, base), - symlinks=True, - ) - - -def get_stats(p): - """Get share properties - - Args: - p (str): Name of the share - - Returns: - Dict: Share properties - - """ - - config = read_config(p) - - del_time = datetime.fromisoformat(config["delete_time"]) - now = datetime.now() - mtime = datetime.fromisoformat(config["created"]) - - to_del_time = del_time - now - is_due = now > del_time - config["delete_time"] = del_time - config["created"] = mtime - config["name"] = p - config["to_deletion"] = to_del_time - config["due"] = is_due - - return config - - -def get_sub_files(p): - """Get 1st level files in a share - - Args: - p (str): Name of the share - - Returns: - List: Folders and Files in the share - - """ - - file_list = sorted([x for x in os.listdir(p) if not x.startswith(".")]) - dir_list = [x + "/" for x in file_list if os.path.isdir(os.path.join(p, x))] - file_list = [x for x in file_list if os.path.isfile(os.path.join(p, x))] - return dir_list + file_list - - -def get_folders(): - """Get share properties for all the shares - - Args: - None: - - Returns: - List: List that contain `get_stats(p)` for every share - - """ - - dir_list = [p for p in os.listdir(".") if os.path.exists(os.path.join(p, CONFIG))] - configs = [get_stats(p) for p in dir_list] - configs.sort(key=lambda c: c["created"], reverse=True) - return configs - - -def list_folders(root_url, verbose=False, filter_name=None): - """Show folder list - - Args: - root_url (str): URL to root of the share system - verbose (bool): If set true, displays folder contents - filter_name (str): If set, display only one folder matching the name - - Returns: - None - - """ - - folders = get_folders() - header_format = "{:" + str(12 + len(root_url)) + "} {} {} {} {}" - print(header_format.format("URL", "Created", "ToDelete", "InDays", "Description")) - for c in folders: - if filter_name is not None: - if filter_name != c["name"]: - continue - due = "*" if c["due"] else " " - print( - "{}/{}/ {} {} {: 4d}d{} {}".format( - root_url, - c["name"], - c["created"].isoformat()[0:10], - c["delete_time"].isoformat()[0:10], - c["to_deletion"].days, - due, - c["description"], - ) - ) - if verbose: - sub_files = get_sub_files(c["name"]) - for sp in sub_files: - print(" {}/{}/{}".format(root_url, c["name"], quote(sp, "/"))) - - -def del_due_folders(): - """Delete folders where due date has passed - - Args: - None - - Returns: - None - - """ - folders = get_folders() - for c in folders: - if c["due"]: - print("Deleting {}".format(c["name"])) - shutil.rmtree(c["name"]) - - -def read_config(p): - """Read the share configuration file - - Args: - p (str): Share name - - Returns: - Dict: Share config - - Example: - { - "created": "2020-12-28T23:04:07.275200", - "delete_time": "2020-12-29T23:04:07.275200", - "description": "Test share" - } - - """ - with open(os.path.join(p, CONFIG), "rt") as fp: - return json.load(fp) - - -def write_config(p, config): - """Write the share configuration file - - Args: - p (str): Share name - config (Dict): Share configuration - - Returns: - None - """ - - with open(os.path.join(p, CONFIG), "wt") as fp: - return json.dump(config, fp, indent=2, sort_keys=True) - - def tryint(s): """Return integer, or None @@ -411,43 +437,4 @@ def tryint(s): def main(): - try: - global_config = read_default_global_config() - except FileNotFoundError: - write_default_global_config() - global_config = read_default_global_config() - - opts = parse_opts(global_config) - cwd = os.getcwd() - if opts.home == None: - os.chdir(os.path.dirname(__file__)) - else: - os.chdir(opts.home) - - if opts.command == "add": - """Add a new share""" - - new_name = random_name() - create_new(new_name, opts.days, opts.description) - print(os.path.abspath(new_name)) - print("") - copy_files(cwd, new_name, opts.files) - list_folders(opts.root, verbose=True, filter_name=new_name) - - if opts.command == "list" or opts.command is None: - """List shares""" - - name = None - try: - name = opts.name - except AttributeError: - pass - if name: - if os.path.isdir(os.path.join(cwd, name)): - name = os.path.basename(os.path.realpath(os.path.join(cwd, name))) - list_folders(opts.root, verbose=opts.verbose, filter_name=name) - - if opts.command == "del": - """Delete due shares""" - - del_due_folders() + flit = Flit()