Files
q-tools/web/transfer-time
2025-10-10 10:23:24 +03:00

346 lines
9.2 KiB
Python
Executable File

#!/usr/bin/env python3
import argparse
import datetime
import os
import sys
import time
def is_digit(d):
try:
d = int(d)
return True
except ValueError:
return False
def is_numerical(f):
try:
f = float(f)
return True
except ValueError:
return False
def get_files_size(path):
if not os.path.exists(path):
return None
if os.path.isfile(path):
return float(os.path.getsize(path))
total_size = 0
for dirpath, dirnames, filenames in os.walk(path):
for f in filenames:
fp = os.path.join(dirpath, f)
total_size += os.path.getsize(fp)
return float(total_size)
def get_numerical(num_string):
num_string = num_string.lower()
numerical_part = ""
last_digit = 0
for i, c in enumerate(num_string):
if is_digit(c):
last_digit = i + 1
numerical_part = num_string[0:last_digit]
if not is_numerical(numerical_part):
return None, None
return numerical_part, last_digit
def parse_options():
parser = argparse.ArgumentParser(
description="Transfer time calculator. Two arguments only! The third is calculated.",
epilog="You may omit the 'b' to time events. ex. '20 minutes / task, 7 tasks to do': %(prog)s 3/h 7",
)
parser.add_argument(
"--rate",
"-r",
help="Rate: inverse of speed. ex: 0.1s/b",
action="store",
default=None,
)
parser.add_argument(
"--speed", "-v", help="Speed of transfer (ex. 3.2Mb/s). Time units: s, m, h, d, w", default=None
)
parser.add_argument(
"--size",
"-s",
help="Data size (ex. 4.55GB), or folder/file path. Units: b, kb, mb, gb, tb, pb or kib, mib ... ",
default=None,
)
parser.add_argument(
"--time", "-t", help="Time used to transfer Units: s,m,h,d or [dd]:[hh]:[mm]:[ss].[ss]... ", default=None
)
parser.add_argument(
"--follow",
help="File name to read size and keep following change speed. Can use --size to estimate ETA",
default=None,
)
parsed = parser.parse_args()
if parsed.rate and parsed.speed:
parser.error("Can't use both rate and speed")
if parsed.follow:
return parsed
if (
sum((int(parsed.speed is None and parsed.rate is None), int(parsed.time is None), int(parsed.size is None)))
!= 1
):
parser.error("Exactly two measures required")
return parsed
def parse_rate(rate_string):
if not "/" in rate_string:
return None
(divisor_string, divider_string) = rate_string.split("/", 1)
numerical_part, last_digit = get_numerical(divisor_string)
if not is_numerical(numerical_part):
return None
numerical_part = float(numerical_part)
multiplier = divisor_string[last_digit:]
divider = parse_size("1" + divider_string)
if divider == None:
# Cannot parse
return divider
if multiplier == "s":
return divider / 1 / numerical_part
if multiplier == "m":
return divider / 60 / numerical_part
if multiplier == "h":
return divider / 3600 / numerical_part
if multiplier == "d":
return divider / (24 * 3600) / numerical_part
return None
def parse_size(size_string):
numerical_part, last_digit = get_numerical(size_string)
if not is_numerical(numerical_part):
return None
numerical_part = float(numerical_part)
multiplier_part = size_string[last_digit:]
if multiplier_part in ("b", ""):
return numerical_part
if multiplier_part == "kb":
return 1024 * numerical_part
if multiplier_part == "mb":
return 1024**2 * numerical_part
if multiplier_part == "gb":
return 1024**3 * numerical_part
if multiplier_part == "tb":
return 1024**4 * numerical_part
if multiplier_part == "pb":
return 1024**5 * numerical_part
if multiplier_part == "kib":
return 1000 * numerical_part
if multiplier_part == "mib":
return 1000**2 * numerical_part
if multiplier_part == "gib":
return 1000**3 * numerical_part
if multiplier_part == "tib":
return 1000**4 * numerical_part
if multiplier_part == "pib":
return 1000**5 * numerical_part
return None
def parse_speed(speed_string):
if not "/" in speed_string:
return None
(divisor_string, divider_string) = speed_string.split("/", 1)
divisor = parse_size(divisor_string)
if divisor == None:
# Cannot parse
return divisor
if divider_string == "s":
return divisor
if divider_string == "m":
return divisor / 60
if divider_string == "h":
return divisor / 3600
if divider_string == "d":
return divisor / (24 * 3600)
return None
def parse_time(time_string):
"""Return in seconds"""
if ":" in time_string:
split_time = time_string.split(":")
if len(split_time) > 0:
s = float(split_time.pop())
if len(split_time) > 0:
s += 60 * float(split_time.pop())
if len(split_time) > 0:
s += 60 * 60 * float(split_time.pop())
if len(split_time) > 0:
s += 24 * 60 * 60 * float(split_time.pop())
if len(split_time) > 0:
raise ValueError("Too many digits in time string")
return s
if is_numerical(time_string):
return float(time_string)
value = float(time_string[0:-1])
unit = time_string[-1]
if unit == "s":
return value
if unit == "m":
return 60 * value
if unit == "h":
return 60 * 60 * value
if unit == "d":
return 24 * 60 * 60 * value
if unit == "w":
return 7 * 24 * 60 * 60 * value
if unit == "y":
return 365 * 24 * 60 * 60 * value
raise ValueError("Can't parse time unit")
def print_err(s):
sys.stderr.write(str(s) + "\n")
sys.stderr.flush()
def time_human(seconds):
return str(datetime.timedelta(seconds=seconds))
def size_human(size, precision=1):
if size == None:
return "nan"
suffixes = ["B", "KB", "MB", "GB", "TB", "PB"]
suffixIndex = 0
defPrecision = 0
while size > 1024:
suffixIndex += 1 # increment the index of the suffix
size = float(size / 1024.0) # apply the division
defPrecision = precision
return "%.*f%s" % (defPrecision, size, suffixes[suffixIndex])
def keep_following(opts, max_size):
start_size = get_files_size(opts.follow)
if start_size is None:
start_size = 0
last_size = start_size
start_time = time.time()
last_time = start_time
max_str = ""
clr = "\033[K"
speed_lag = [0, 0, 0]
while True:
try:
time.sleep(1)
curr_size = get_files_size(opts.follow)
if curr_size is None:
curr_size = 0
curr_time = time.time()
time_elapsed = curr_time - start_time
time_diff = curr_time - last_time
size_diff = curr_size - last_size
speed = size_diff / time_diff
speed_lag.append(speed)
speed_lag.pop(0)
speed_mean = sum(speed_lag) / 3
if max_size:
time_eta = 0
if speed > 0:
to_transfer = max_size - curr_size
time_eta = round(to_transfer / speed_mean)
if time_eta > 0:
max_str = f", ETA: {time_human(time_eta)}"
else:
max_str = ", ETA: NA"
print(
f"{clr}Transfer time: {time_human(round(time_elapsed))}, size: {size_human(curr_size)}, speed: {size_human(speed_mean)}/s{max_str}",
end="\r",
)
last_size = curr_size
last_time = curr_time
except Exception as e:
print(e, end="\r")
raise e
except KeyboardInterrupt:
break
print("")
def main():
opts = parse_options()
speed, size, time = (None, None, None)
if opts.rate is not None:
speed = parse_rate(opts.rate.lower())
if speed is None:
raise ValueError("Cannot parse rate ( ex. 3.5s/kb ), Rate: %s" % (opts.speed,))
if opts.speed is not None:
speed = parse_speed(opts.speed.lower())
if speed is None:
raise ValueError("Cannot parse speed ( ex. 3.5Mb/s ), Speed: %s" % (opts.speed,))
if opts.size is not None:
if os.path.exists(opts.size):
size = get_files_size(opts.size)
else:
size = parse_size(opts.size.lower())
if size is None:
raise ValueError(
"Cannot parse size, and it's not a path either ( ex. 11Gb / file.name ), Size: %s" % (opts.size,)
)
if opts.follow is not None:
keep_following(opts, size)
return
if opts.time is not None:
time = parse_time(opts.time.lower())
if time is None:
print(f"Transfer time: {time_human(round(size / speed))}")
if size is None:
print(f"Transferred size: {size_human(speed * time)}")
if speed is None:
print(f"Transfer speed: {size_human(size/time)}/s")
if __name__ == "__main__":
main()