Files
flees/code/utils/utils.py

294 lines
8.3 KiB
Python

import os
from datetime import datetime
from flask import current_app as app
import requests
import re
import json
import stat
try:
import magic
except ImportError:
pass
try:
from werkzeug.utils import secure_filename
except ImportError:
pass
try:
from urllib.request import pathname2url
from urllib.request import urlparse
except ImportError:
from urllib import pathname2url
from urlparse import urlparse
class Logger:
def __init__(self, filename, uid = 0, gid = 0):
self.filename = filename
self.uid = uid
self.gid = gid
self.init()
def info(self, message):
self.write("INFO", str(message))
def init(self):
self.info("Service started")
self.set_rights()
def set_rights(self):
os.chown(
self.filename,
self.uid,
self.gid
)
st = os.stat(self.filename)
if self.uid > 0:
os.chmod(
self.filename,
st.st_mode | stat.S_IRUSR | stat.S_IWUSR
)
if self.gid > 0:
os.chmod(
self.filename,
st.st_mode | stat.S_IRGRP | stat.S_IWGRP
)
def warning(self, message):
self.write("WARNING", str(message))
def write(self, level, message):
with open(self.filename, 'at') as fp:
fp.write("%s\t%s\t%s\n"%(
datetime.now().isoformat(),
level,
message
))
fp.flush()
def download_url(url, filename):
try:
r = requests.get(url, stream=True)
with open(filename, 'wb') as f:
for chunk in r.iter_content(chunk_size=1024 * 1024):
if chunk: # filter out keep-alive new chunks
f.write(chunk)
except requests.exceptions.RequestException as e:
return (False, ("%s %s"%(e.code,e.reason), e.code))
return (True, ("OK", 200 ))
def file_date_human(num):
return datetime.fromtimestamp(
num
).strftime(app.config['DATE_FORMAT'])
def file_name_version(full_path):
""" New name versioned with date of the file """
file_dir = os.path.dirname(full_path)
file_name = os.path.basename(full_path)
file_time = os.stat(full_path).st_mtime
time_formatted = datetime.fromtimestamp(
file_time
).strftime('%Y%m%d_%H%M%S')
new_name = '%s.%s'%(
time_formatted,
file_name
)
return new_name
def version_date(full_path):
""" Date of versioned file """
file_dir = os.path.dirname(full_path)
file_name = os.path.basename(full_path)
try:
return datetime.strptime(
file_name[0:15],
'%Y%m%d_%H%M%S'
)
except ValueError:
return None
def file_mime(filename):
try:
return magic.from_file(filename, mime = True)
except NameError:
# magic not imported
return "NA"
def file_stat(path, filename):
full_path = os.path.join(path, filename)
s = os.stat(full_path)
return {
'size': file_size_MB(s.st_size),
'hsize': file_size_human(s.st_size, HTML = False),
'mtime': file_date_human(s.st_mtime),
'name': filename,
'url': path2url(filename),
'editable': (s.st_size < 65536 and filename.endswith(".txt")),
'mime': file_mime(full_path)
}
def file_size_human(num, HTML = True):
space = '&nbsp;' if HTML else ' '
for x in [space + 'B', 'KB', 'MB', 'GB', 'TB']:
if num < 1024.0:
if x == space + 'B':
return "%d%s%s" % (num, space, x)
return "%3.1f%s%s" % (num, space, x)
num /= 1024.0
def file_size_MB(num):
return "{:,.2f}".format(num/(1024*1024))
def get_folder_size(path):
total_size = 0
for dirpath, dirnames, filenames in os.walk(path):
for f in filenames:
fp = os.path.join(dirpath, f)
total_size += os.path.getsize(fp)
return total_size
def get_or_none(key,d,none = None):
if key in d:
return d[key]
else:
return none
def get_script_url(public_url, share, end_point, token = "[TOKEN]"):
cmd = None
doc = None
if get_or_none("direct_links", share) and end_point == "download":
end_point = "direct"
url = "%s/script/%s/%s/%s"%(
public_url,
end_point,
share['name'],
token
)
if end_point in ( "download", "direct"):
cmd = 'curl -s %s | bash /dev/stdin [-f]'%( url, )
doc = 'Download all files in the share. -f to force overwrite existing files.'
if end_point == "client":
cmd = 'python <( curl -s %s )'%( url, )
doc = 'Console client to download and upload files.'
if end_point == "upload_split":
cmd = 'curl -s %s | python - [-s split_size_in_Mb] file_to_upload.ext [second.file.ext]'%( url, )
doc = 'Upload files to the share. -s to set splitting size.'
if end_point == "flip":
cmd = 'curl -s %s > flip && ./flip'%( url, )
doc = 'Use the share as a command line clipboard'
return {'cmd': cmd, 'doc': doc}
def is_path_safe(path):
if path.startswith("."):
return False
if "/." in path:
return False
return True
def is_valid_url(url, qualifying = None):
min_attributes = ('scheme', 'netloc')
qualifying = min_attributes if qualifying is None else qualifying
token = urlparse(url)
return all([getattr(token, qualifying_attr)
for qualifying_attr in qualifying])
def iter_folder_files(path, recursive = True, version_folder = None):
if recursive:
for dirpath, dirnames, filenames in os.walk(path, topdown = False):
path_list = dirpath.split(os.sep)
if path_list[-1] == version_folder:
continue
relative_path = os.path.relpath(dirpath,path)
dirnames.sort()
if "/." in relative_path:
continue
if relative_path == ".":
relative_path = ""
for f in sorted(filenames):
if f.startswith("."):
continue
fp = os.path.join(relative_path, f)
yield fp
else:
for file in sorted(path):
fp = os.path.join(path,file)
if os.path.isdir(fp):
continue
if file.startswith("."):
continue
yield fp
def path2url(path):
return pathname2url(path)
def read_config(app):
# Read config from json
config_values = json.load(open(os.getenv('FLEES_CONFIG'),'rt'))
app.config['PUBLIC_URL'] = config_values['public_url']
app.config['SITE_NAME'] = config_values['site_name']
app.config['UPLOAD_FOLDER'] = config_values['data_folder']
app.config['SHARES_FILE'] = config_values['shares_file']
if 'log_file' in config_values:
app.config['LOG_FILE'] = config_values['log_file']
else:
app.config['LOG_FILE'] = os.path.join(app.config['UPLOAD_FOLDER'], 'flees.log')
app.config['ZIP_FOLDER'] = config_values['zip_folder']
app.config['MAX_ZIP_SIZE'] = config_values['max_zip_size'] # megabytes
app.config['DATE_FORMAT'] = config_values['date_format']
app.config['UID'] = config_values['uid']
app.config['GID'] = config_values['gid']
app.config['DEBUG'] = config_values['debug']
app.config['VERSION_FOLDER'] = config_values['version_folder']
app.config['SESSION_COOKIE_NAME'] = secure_filename(config_values['site_name'])
app.config['LOGGER'] = Logger(
app.config['LOG_FILE'],
app.config['UID'],
app.config['GID']
)
return config_values
def safe_name(s):
return safe_string(s, "-_")
def safe_path(s):
return safe_string(s, "-_/")
def safe_string(s, valid, no_repeat = False):
""" return a safe string, replace non alnum characters with _ . all characters in valid are considered valid. """
safe = "".join([c if c.isalnum() or c in valid else "_" for c in s])
if no_repeat:
safe = re.sub(r'_+', '_', safe)
return safe
def set_rights(path):
os.chown(path, app.config['UID'], app.config['GID'])
st = os.stat(path)
if app.config['UID'] > 0:
os.chmod(path, st.st_mode | stat.S_IRUSR | stat.S_IWUSR)
if app.config['GID'] > 0:
os.chmod(path, st.st_mode | stat.S_IRGRP | stat.S_IWGRP)