#!/usr/bin/env python3 import sys, os, glob from datetime import datetime from datetime import timedelta import re, signal, time import subprocess # ,threading VERSION = 3 W = "30" R = "31" G = "32" Y = "33" B = "34" M = "35" C = "36" S = "1" E = "0" BR = "41" CLR = "\033[2J" SAVE = "\033[s" LOAD = "\033[u" CLRLN = "\033[K" CLRBLN = "\033[1K" DOWN = "\033[1B" SORTKEY = lambda key: (key[2].split("/")[-1].lower()) def setup_options(): """ Setup the command line options """ from argparse import ArgumentParser import argparse parser = ArgumentParser( description=""" Shows the output of df in colour. """, formatter_class=argparse.RawTextHelpFormatter, ) parser.add_argument( "--no-colors", "--nc", action="store_false", dest="colors", default=True, help="Disable colored output", ) parser.add_argument("-n", type=int, dest="delay", default=3, help="Refresh delay") parser.add_argument( "-1", action="store_true", dest="once", default=False, help="Run once and exit" ) parser.add_argument( "-x", "--exclude", type=str, dest="exclude", default="tmpfs,devtmpfs,squashfs,overlay", help="Comma separated list of excluded filesystem types. Defaults: %(default)s", ) parser.add_argument("--version", action="version", version=VERSION) parser.add_argument("paths", action="store", nargs="*", help="Only include disks. Defaults to all.") options = parser.parse_args() return options def c(attribs): """ ANSI colorizer """ if not options.colors: return "" return "\033[" + ";".join(attribs) + "m" def pos(y, x): """ ANSI absolute position set """ return "\033[" + str(y) + ";" + str(x) + "H" def colorize(string): """ colorizes a string based on color_match """ if not options.colors: return string for co in color_match: string = color_match[co][0].sub(color_match[co][1], string) return string def count_running(string, stats): """ Counts the running executions """ spl = [i for i in " ".join(string.split()).split(" ")] if len(spl) != 7: return stats if spl[6] in stats["files"]: index = stats["files"].index(spl[6]) speed_history = stats["running"][index][1][1:] speed_history.append( (int(spl[4]) * 1024 - (stats["running"][index][0])) / stats["delay"] ) stats["running"][index] = ( int(spl[4]) * 1024, # free space speed_history, # change speed spl[6], # mount point stats["running"][index][3], # free space program start spl[5], # usage in % spl[1], # mount type int(spl[2]) * 1024, # total space ) else: stats["running"].append( ( int(spl[4]) * 1024, [int(0)] * 5, spl[6], int(spl[4]) * 1024, spl[5], spl[1], int(spl[2]) * 1024, ) ) stats["running"].sort(key=SORTKEY) stats["files"] = [i[2] for i in stats["running"]] totalfree = sum([i[0] for i in stats["running"]]) total = sum([i[6] for i in stats["running"]]) stats["totals"] = [totalfree, total] return stats class EndProgram(Exception): """ Nice way of exiting the program """ pass def is_number(s): """ Check if string is float """ try: out = float(s) return True except: return False def str_short(s, stats): """ shorten text to fit screen """ maxL = stats["size"][1] - 16 if len(s) < maxL: return s spl = s.split("/") sNew = spl[0] + "/..." + "/".join(spl[1:])[-(maxL - len(spl[0]) - 5) :] return sNew def print_stats(stats): """ Prints logged errors, and the status line """ # sys.stdout.write(SAVE) e = 0 sys.stdout.write( pos(e + 1, 0) + c((S, C)) + "= DISK FREE = " + c((E)) + human_time(stats["time"]) + "=>" + c((S, G)) + human_time() + c((E)) + CLRLN ) if stats["running"]: pass else: return sys.stdout.write( pos(e + 2, 0) + " TotalDiff Free Total (usage%) Diff/s (positive=more free space)" + CLRLN ) for ex in enumerate(stats["running"]): sys.stdout.write( pos(e + 3 + ex[0], 0) + "(" + str(ex[0] + 1).rjust(2) + ") " + " ".join( [ human_size(ex[1][0] - ex[1][3]).rjust(10), human_size(ex[1][0]).rjust(10), human_size(ex[1][6]).rjust(8), colorize_usage(ex[1][4]), (human_size(mean_speed(ex[1][1])) + "/s").rjust(10), ex[1][2], "(" + ex[1][5] + ")", ] ) + CLRLN ) sys.stdout.write( pos(e + 4 + ex[0], 0) + "Total:" + " ".join( [ " ".rjust(9), human_size(stats["totals"][0]).rjust(10), "/", human_size(stats["totals"][1]).ljust(10), ] ) + CLRLN ) for i in range(stats["size"][0] - 7 - len(stats["running"])): sys.stdout.write(pos(e + 5 + ex[0] + i, 0) + " " + CLRLN) sys.stdout.write(DOWN + CLRBLN + CLRLN) # sys.stdout.write(LOAD) def print_stats_once(stats): """ Prints logged errors, once """ e = 0 sys.stdout.write(c((S, C)) + "= DISK FREE = " + c((E,)) + CLRLN + "\n") if stats["running"]: pass else: return sys.stdout.write(" Total Used Use% Free" + CLRLN + "\n") for ex in enumerate(stats["running"]): sys.stdout.write( " ".join( [ human_size(ex[1][6]).rjust(8), human_size(ex[1][6] - ex[1][0]).rjust(10), colorize_usage(ex[1][4]), human_size(ex[1][0]).rjust(10), ex[1][2], "(" + ex[1][5] + ")", ] ) + CLRLN + "\n" ) sys.stdout.write( " ".join( [ human_size(stats["totals"][1]).rjust(8), human_size(stats["totals"][1] - stats["totals"][0]).rjust(10), " ", human_size(stats["totals"][0]).rjust(10), ] ) + CLRLN + "\n" ) def colorize_usage(string): """ colorizes the usage string """ # string length indicates value <10 if len(string) < 3: return c((S, G)) + " " + string + c((E)) # string lenght indicates 100% if len(string) == 4: return c((S, R)) + string + c((E)) usage = int(string[0:2]) if usage > 95: return c((S, R)) + " " + string + c((E)) if usage < 80: return c((S, G)) + " " + string + c((E)) return c((S, Y)) + " " + string + c((E)) def mean_speed(history): speed = sum(history) / len(history) return int(speed) def human_time(dt=False): if not dt: dt = datetime.now() return dt.strftime("%H:%M:%S") def human_size(size, precision=1): if size == None: return "nan" sign = "" if size < 0: sign = "-" size = -size suffixes = ["B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB"] suffixIndex = 0 defPrecision = 0 while size > 1024: suffixIndex += 1 size = size / 1024.0 defPrecision = precision return "%s%.*f%s" % (sign, defPrecision, size, suffixes[suffixIndex]) def readinput(lf): try: line = lf.stdout.readline() # line=lf.readline() return line except: return "CleanerTimeout" def termsize(): try: rows, columns = os.popen("stty size", "r").read().split() except: (rows, columns) = (25, 80) return (int(rows), int(columns)) options = setup_options() color_match = { #'line_ends':(re.compile('$'),c.END), "err": (re.compile("(Failed)"), c([R, S]) + "\\1" + c([E])), "done": (re.compile("(Done)"), c([G, S]) + "\\1" + c([E])), "percent": (re.compile("([0-9]+%)"), c([Y, S]) + "\\1" + c([E])), } stats = { "time": datetime.now(), "running": [], "files": [], "totals": [], "size": termsize(), "delay": options.delay, } if not options.once: sys.stdout.write(CLR + pos(0, 0) + "Launching...") omit_opts = [] for omit in options.exclude.split(","): omit_opts.append("-x") omit_opts.append(omit.strip()) while 1: try: proc = subprocess.Popen(["df", "-T"] + omit_opts + options.paths, stdout=subprocess.PIPE) # set a 5 second timeout for the line read. # ~ signal.signal(signal.SIGALRM, transfers.readline) # ~ signal.alarm(5) stdout, stderr = proc.communicate() if not stdout: raise EndProgram for line in stdout.decode("utf-8").split("\n")[1:]: stats = count_running(line, stats) if options.once: print_stats_once(stats) sys.exit(0) print_stats(stats) sys.stdout.flush() time.sleep(options.delay) except (EndProgram, KeyboardInterrupt): sys.stdout.write(DOWN + "\n") sys.stdout.flush() sys.exit(0)