Files
q-tools/files/diskfree-tracker
2021-03-01 10:10:41 +02:00

373 lines
9.4 KiB
Python
Executable File

#!/usr/bin/env python3
import sys, os, glob
from datetime import datetime
from datetime import timedelta
import re, signal, time
import subprocess
# ,threading
VERSION = 3
W = "30"
R = "31"
G = "32"
Y = "33"
B = "34"
M = "35"
C = "36"
S = "1"
E = "0"
BR = "41"
CLR = "\033[2J"
SAVE = "\033[s"
LOAD = "\033[u"
CLRLN = "\033[K"
CLRBLN = "\033[1K"
DOWN = "\033[1B"
SORTKEY = lambda key: (key[2].split("/")[-1].lower())
def setup_options():
""" Setup the command line options """
from argparse import ArgumentParser
import argparse
parser = ArgumentParser(
description="""
Shows the output of df in colour.
""",
formatter_class=argparse.RawTextHelpFormatter,
)
parser.add_argument(
"--no-colors",
"--nc",
action="store_false",
dest="colors",
default=True,
help="Disable colored output",
)
parser.add_argument("-n", type=int, dest="delay", default=3, help="Refresh delay")
parser.add_argument(
"-1", action="store_true", dest="once", default=False, help="Run once and exit"
)
parser.add_argument(
"-x",
"--exclude",
type=str,
dest="exclude",
default="tmpfs,devtmpfs,squashfs,overlay",
help="Comma separated list of excluded filesystem types. Defaults: %(default)s",
)
parser.add_argument("--version", action="version", version=VERSION)
parser.add_argument("paths", action="store", nargs="*", help="Only include disks. Defaults to all.")
options = parser.parse_args()
return options
def c(attribs):
""" ANSI colorizer """
if not options.colors:
return ""
return "\033[" + ";".join(attribs) + "m"
def pos(y, x):
""" ANSI absolute position set """
return "\033[" + str(y) + ";" + str(x) + "H"
def colorize(string):
""" colorizes a string based on color_match """
if not options.colors:
return string
for co in color_match:
string = color_match[co][0].sub(color_match[co][1], string)
return string
def count_running(string, stats):
""" Counts the running executions """
spl = [i for i in " ".join(string.split()).split(" ")]
if len(spl) != 7:
return stats
if spl[6] in stats["files"]:
index = stats["files"].index(spl[6])
speed_history = stats["running"][index][1][1:]
speed_history.append(
(int(spl[4]) * 1024 - (stats["running"][index][0])) / stats["delay"]
)
stats["running"][index] = (
int(spl[4]) * 1024, # free space
speed_history, # change speed
spl[6], # mount point
stats["running"][index][3], # free space program start
spl[5], # usage in %
spl[1], # mount type
int(spl[2]) * 1024, # total space
)
else:
stats["running"].append(
(
int(spl[4]) * 1024,
[int(0)] * 5,
spl[6],
int(spl[4]) * 1024,
spl[5],
spl[1],
int(spl[2]) * 1024,
)
)
stats["running"].sort(key=SORTKEY)
stats["files"] = [i[2] for i in stats["running"]]
totalfree = sum([i[0] for i in stats["running"]])
total = sum([i[6] for i in stats["running"]])
stats["totals"] = [totalfree, total]
return stats
class EndProgram(Exception):
""" Nice way of exiting the program """
pass
def is_number(s):
""" Check if string is float """
try:
out = float(s)
return True
except:
return False
def str_short(s, stats):
""" shorten text to fit screen """
maxL = stats["size"][1] - 16
if len(s) < maxL:
return s
spl = s.split("/")
sNew = spl[0] + "/..." + "/".join(spl[1:])[-(maxL - len(spl[0]) - 5) :]
return sNew
def print_stats(stats):
""" Prints logged errors, and the status line """
# sys.stdout.write(SAVE)
e = 0
sys.stdout.write(
pos(e + 1, 0)
+ c((S, C))
+ "= DISK FREE = "
+ c((E))
+ human_time(stats["time"])
+ "=>"
+ c((S, G))
+ human_time()
+ c((E))
+ CLRLN
)
if stats["running"]:
pass
else:
return
sys.stdout.write(
pos(e + 2, 0)
+ " TotalDiff Free Total (usage%) Diff/s (positive=more free space)"
+ CLRLN
)
for ex in enumerate(stats["running"]):
sys.stdout.write(
pos(e + 3 + ex[0], 0)
+ "("
+ str(ex[0] + 1).rjust(2)
+ ") "
+ " ".join(
[
human_size(ex[1][0] - ex[1][3]).rjust(10),
human_size(ex[1][0]).rjust(10),
human_size(ex[1][6]).rjust(8),
colorize_usage(ex[1][4]),
(human_size(mean_speed(ex[1][1])) + "/s").rjust(10),
ex[1][2],
"(" + ex[1][5] + ")",
]
)
+ CLRLN
)
sys.stdout.write(
pos(e + 4 + ex[0], 0)
+ "Total:"
+ " ".join(
[
" ".rjust(9),
human_size(stats["totals"][0]).rjust(10),
"/",
human_size(stats["totals"][1]).ljust(10),
]
)
+ CLRLN
)
for i in range(stats["size"][0] - 7 - len(stats["running"])):
sys.stdout.write(pos(e + 5 + ex[0] + i, 0) + " " + CLRLN)
sys.stdout.write(DOWN + CLRBLN + CLRLN)
# sys.stdout.write(LOAD)
def print_stats_once(stats):
""" Prints logged errors, once """
e = 0
sys.stdout.write(c((S, C)) + "= DISK FREE = " + c((E,)) + CLRLN + "\n")
if stats["running"]:
pass
else:
return
sys.stdout.write(" Total Used Use% Free" + CLRLN + "\n")
for ex in enumerate(stats["running"]):
sys.stdout.write(
" ".join(
[
human_size(ex[1][6]).rjust(8),
human_size(ex[1][6] - ex[1][0]).rjust(10),
colorize_usage(ex[1][4]),
human_size(ex[1][0]).rjust(10),
ex[1][2],
"(" + ex[1][5] + ")",
]
)
+ CLRLN
+ "\n"
)
sys.stdout.write(
" ".join(
[
human_size(stats["totals"][1]).rjust(8),
human_size(stats["totals"][1] - stats["totals"][0]).rjust(10),
" ",
human_size(stats["totals"][0]).rjust(10),
]
)
+ CLRLN
+ "\n"
)
def colorize_usage(string):
""" colorizes the usage string """
# string length indicates value <10
if len(string) < 3:
return c((S, G)) + " " + string + c((E))
# string lenght indicates 100%
if len(string) == 4:
return c((S, R)) + string + c((E))
usage = int(string[0:2])
if usage > 95:
return c((S, R)) + " " + string + c((E))
if usage < 80:
return c((S, G)) + " " + string + c((E))
return c((S, Y)) + " " + string + c((E))
def mean_speed(history):
speed = sum(history) / len(history)
return int(speed)
def human_time(dt=False):
if not dt:
dt = datetime.now()
return dt.strftime("%H:%M:%S")
def human_size(size, precision=1):
if size == None:
return "nan"
sign = ""
if size < 0:
sign = "-"
size = -size
suffixes = ["B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB"]
suffixIndex = 0
defPrecision = 0
while size > 1024:
suffixIndex += 1
size = size / 1024.0
defPrecision = precision
return "%s%.*f%s" % (sign, defPrecision, size, suffixes[suffixIndex])
def readinput(lf):
try:
line = lf.stdout.readline()
# line=lf.readline()
return line
except:
return "CleanerTimeout"
def termsize():
try:
rows, columns = os.popen("stty size", "r").read().split()
except:
(rows, columns) = (25, 80)
return (int(rows), int(columns))
options = setup_options()
color_match = { #'line_ends':(re.compile('$'),c.END),
"err": (re.compile("(Failed)"), c([R, S]) + "\\1" + c([E])),
"done": (re.compile("(Done)"), c([G, S]) + "\\1" + c([E])),
"percent": (re.compile("([0-9]+%)"), c([Y, S]) + "\\1" + c([E])),
}
stats = {
"time": datetime.now(),
"running": [],
"files": [],
"totals": [],
"size": termsize(),
"delay": options.delay,
}
if not options.once:
sys.stdout.write(CLR + pos(0, 0) + "Launching...")
omit_opts = []
for omit in options.exclude.split(","):
omit_opts.append("-x")
omit_opts.append(omit.strip())
while 1:
try:
proc = subprocess.Popen(["df", "-T"] + omit_opts + options.paths, stdout=subprocess.PIPE)
# set a 5 second timeout for the line read.
# ~ signal.signal(signal.SIGALRM, transfers.readline)
# ~ signal.alarm(5)
stdout, stderr = proc.communicate()
if not stdout:
raise EndProgram
for line in stdout.decode("utf-8").split("\n")[1:]:
stats = count_running(line, stats)
if options.once:
print_stats_once(stats)
sys.exit(0)
print_stats(stats)
sys.stdout.flush()
time.sleep(options.delay)
except (EndProgram, KeyboardInterrupt):
sys.stdout.write(DOWN + "\n")
sys.stdout.flush()
sys.exit(0)