Files
q-tools/files/diskfree-tracker
2020-10-01 15:53:12 +03:00

311 lines
9.0 KiB
Python
Executable File

#!/usr/bin/env python3
import sys,os,glob
from datetime import datetime
from datetime import timedelta
import re,signal,time
import subprocess
#,threading
VERSION=3
W= '30'
R= '31'
G= '32'
Y= '33'
B= '34'
M= '35'
C= '36'
S= '1'
E= '0'
BR= '41'
CLR = '\033[2J'
SAVE = '\033[s'
LOAD = '\033[u'
CLRLN = '\033[K'
CLRBLN = '\033[1K'
DOWN = '\033[1B'
SORTKEY = lambda key: (key[2].split('/')[-1].lower())
def setup_options():
''' Setup the command line options '''
from argparse import ArgumentParser
import argparse
parser=ArgumentParser(description='''
Shows the output of df in colour.
''',formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument("--no-colors",'--nc',action="store_false",dest="colors",default=True,
help="Disable colored output")
parser.add_argument("-n",type=int,dest="delay",default=3,
help="Refresh delay")
parser.add_argument("-1",action="store_true",dest="once",default=False,
help="Run once and exit")
parser.add_argument("-x",'--exclude',type=str,dest="exclude",default="tmpfs,devtmpfs,squashfs,overlay",
help="Comma separated list of excluded filesystem types. Defaults: %(default)s")
parser.add_argument("--version",action='version', version=VERSION)
options=parser.parse_args()
return options
def c(attribs):
''' ANSI colorizer '''
if not options.colors:
return ""
return '\033['+';'.join(attribs)+'m'
def pos(y,x):
''' ANSI absolute position set '''
return "\033["+str(y)+";"+str(x)+"H"
def colorize(string):
''' colorizes a string based on color_match '''
if not options.colors:
return string
for co in color_match:
string=color_match[co][0].sub(color_match[co][1],string)
return string
def count_running(string, stats):
''' Counts the running executions '''
spl=[i for i in " ".join(string.split()).split(' ')]
if len(spl)!=7:
return stats
if spl[6] in stats['files']:
index=stats['files'].index(spl[6])
speed_history=stats['running'][index][1][1:]
speed_history.append((int(spl[4])*1024-(stats['running'][index][0]))/stats['delay'])
stats['running'][index]=(int(spl[4])*1024, # free space
speed_history, # change speed
spl[6], # mount point
stats['running'][index][3], # free space program start
spl[5], # usage in %
spl[1], # mount type
int(spl[2])*1024 # total space
)
else:
stats['running'].append((int(spl[4])*1024,
[int(0)]*5,
spl[6],
int(spl[4])*1024,
spl[5],
spl[1],
int(spl[2])*1024
))
stats['running'].sort(key=SORTKEY)
stats['files']=[i[2] for i in stats['running']]
totalfree=sum([i[0] for i in stats['running']])
total=sum([i[6] for i in stats['running']])
stats['totals']=[totalfree, total]
return stats
class EndProgram( Exception ):
''' Nice way of exiting the program '''
pass
def is_number(s):
''' Check if string is float '''
try:
out=float(s)
return True
except:
return False
def str_short(s,stats):
''' shorten text to fit screen '''
maxL=stats['size'][1] - 16
if len(s)<maxL:
return s
spl=s.split('/')
sNew=spl[0]+'/...'+'/'.join(spl[1:])[-(maxL-len(spl[0])-5):]
return sNew
def print_stats(stats):
''' Prints logged errors, and the status line '''
#sys.stdout.write(SAVE)
e=0
sys.stdout.write(pos(e+1,0)+c((S,C))+"= DISK FREE = "+c((E))+
human_time(stats['time'])+'=>'+c((S,G))+human_time()+c((E))+CLRLN)
if (stats['running']):
pass
else:
return
sys.stdout.write(pos(e+2,0)+" TotalDiff Free Total (usage%) Diff/s (positive=more free space)"+CLRLN)
for ex in enumerate(stats['running']):
sys.stdout.write(pos(e+3+ex[0],0)+'('+str(ex[0]+1).rjust(2)+') '+
' '.join([human_size(ex[1][0]-ex[1][3]).rjust(10),
human_size(ex[1][0]).rjust(10),
human_size(ex[1][6]).rjust(8),
colorize_usage(ex[1][4]),
(human_size(mean_speed(ex[1][1]))+"/s").rjust(10),
ex[1][2],
"("+ex[1][5]+")"])+
CLRLN)
sys.stdout.write(pos(e+4+ex[0],0)+'Total:'+
' '.join([' '.rjust(9),
human_size(stats['totals'][0]).rjust(10),
'/',
human_size(stats['totals'][1]).ljust(10)
])+
CLRLN)
for i in range(stats['size'][0]-7-len(stats['running'])):
sys.stdout.write(pos(e+5+ex[0]+i,0)+" "+CLRLN)
sys.stdout.write(DOWN+CLRBLN+CLRLN)
#sys.stdout.write(LOAD)
def print_stats_once(stats):
''' Prints logged errors, once '''
e=0
sys.stdout.write(c((S,C))+"= DISK FREE = "+c((E,))+CLRLN+'\n')
if (stats['running']):
pass
else:
return
sys.stdout.write(" Total Used Use% Free"+CLRLN+'\n')
for ex in enumerate(stats['running']):
sys.stdout.write(
' '.join([
human_size(ex[1][6]).rjust(8),
human_size(ex[1][6]-ex[1][0]).rjust(10),
colorize_usage(ex[1][4]),
human_size(ex[1][0]).rjust(10),
ex[1][2],
"("+ex[1][5]+")"])+
CLRLN+'\n')
sys.stdout.write(
' '.join([
human_size(stats['totals'][1]).rjust(8),
human_size(stats['totals'][1]-stats['totals'][0]).rjust(10), ' ',
human_size(stats['totals'][0]).rjust(10)
])+
CLRLN+'\n')
def colorize_usage(string):
''' colorizes the usage string '''
# string length indicates value <10
if len(string)<3:
return c((S,G))+" "+string+c((E))
# string lenght indicates 100%
if len(string)==4:
return c((S,R))+string+c((E))
usage=int(string[0:2])
if usage>95:
return c((S,R))+" "+string+c((E))
if usage<80:
return c((S,G))+" "+string+c((E))
return c((S,Y))+" "+string+c((E))
def mean_speed(history):
speed=sum(history)/len(history)
return int(speed)
def human_time(dt=False):
if not dt:
dt=datetime.now()
return dt.strftime("%H:%M:%S")
def human_size(size,precision=1):
if size==None:
return 'nan'
sign=""
if size<0:
sign="-"
size=-size
suffixes=['B','KB','MB','GB','TB','PB','EB','ZB']
suffixIndex = 0
defPrecision=0
while size > 1024:
suffixIndex += 1
size = size/1024.0
defPrecision=precision
return "%s%.*f%s"%(sign,defPrecision,size,suffixes[suffixIndex])
def readinput(lf):
try:
line = lf.stdout.readline()
#line=lf.readline()
return line
except:
return "CleanerTimeout"
def termsize():
try:
rows, columns = os.popen('stty size', 'r').read().split()
except:
(rows,columns)=(25,80)
return (int(rows),int(columns))
options=setup_options()
color_match={#'line_ends':(re.compile('$'),c.END),
'err':(re.compile('(Failed)'),c([R,S])+'\\1'+c([E])),
'done':(re.compile('(Done)'),c([G,S])+'\\1'+c([E])),
'percent':(re.compile('([0-9]+%)'),c([Y,S])+'\\1'+c([E])),
}
stats={'time':datetime.now(),
'running':[],
'files':[],
'totals':[],
'size': termsize(),
'delay': options.delay
}
if not options.once:
sys.stdout.write(CLR+pos(0,0)+"Launching...")
omit_opts = []
for omit in options.exclude.split(","):
omit_opts.append("-x")
omit_opts.append(omit.strip())
while 1:
try:
proc = subprocess.Popen(['df','-T'] + omit_opts,stdout=subprocess.PIPE)
# set a 5 second timeout for the line read.
#~ signal.signal(signal.SIGALRM, transfers.readline)
#~ signal.alarm(5)
stdout,stderr=proc.communicate()
if not stdout:
raise EndProgram
for line in stdout.decode('utf-8').split('\n')[1:]:
stats=count_running(line,stats)
if options.once:
print_stats_once(stats)
sys.exit(0)
print_stats(stats)
sys.stdout.flush()
time.sleep(options.delay)
except (EndProgram, KeyboardInterrupt):
sys.stdout.write(DOWN+'\n')
sys.stdout.flush()
sys.exit(0)