diff --git a/bin/rsync-queue b/bin/rsync-queue new file mode 120000 index 0000000..1ad0c92 --- /dev/null +++ b/bin/rsync-queue @@ -0,0 +1 @@ +../files/rsync-queue \ No newline at end of file diff --git a/files/rsync-queue b/files/rsync-queue new file mode 100755 index 0000000..4016a8d --- /dev/null +++ b/files/rsync-queue @@ -0,0 +1,156 @@ +#!/usr/bin/python +import sys +import os,time,datetime +import sqlite3 +import subprocess,shlex +from argparse import ArgumentParser + +SQLFILE=os.path.expandvars('$HOME/.rsync-queue.sqlite') + +def setup_options(): + parser=ArgumentParser(description="Maintain a queue for file synchronization") + parser.add_argument("-r",action="store_true",dest="run",default=False, + help="Start rsync") + parser.add_argument("-R",action="store_true",dest="forever",default=False, + help="Start rsync in a loop, never exit") + parser.add_argument("--rerun",action="store_true",dest="rerun",default=False, + help="When running, run even the correctly exited.") + parser.add_argument("-l",action="store_true",dest="listDB",default=False, + help="List DB contents") + parser.add_argument("-L",action="store_true",dest="listAllDB",default=False, + help="List DB contents, even the completed") + parser.add_argument("-f",action="store",dest="sqlfile",default=SQLFILE, + help="SQL file name to use [%(default)s]") + parser.add_argument("-c",action="store_true",dest="clear",default=False, + help="Clear DB of completed entries") + parser.add_argument("-C",action="store_true",dest="clearAll",default=False, + help="Clear DB of all entries") + parser.add_argument("-o",action="store",dest="options",default="-vaP", + help="Options to rsync") + parser.add_argument('SRC', action="store",default='', nargs='?') + parser.add_argument('TGT', action="store",default='', nargs='?') + + options=parser.parse_args() + if options.forever: + options.run=True + if options.clearAll: + options.clear=True + return options + +def createdb(fname): + conn=sqlite3.connect(fname) + db=conn.cursor() + conn.text_factory=str + db.execute('CREATE TABLE list (id INTEGER PRIMARY KEY AUTOINCREMENT,\ + SRC TEXT,TGT TEXT, exitcode INTEGER)') + conn.commit() + return + +def mark_done(options, SRC,TGT, exitcode): + conn=sqlite3.connect(options.sqlfile) + conn.text_factory=str + db=conn.cursor() + db.execute("UPDATE list SET exitcode=? \ + WHERE SRC=? AND TGT=?",(exitcode,SRC,TGT)) + conn.commit() + return + +def add(options): + conn=sqlite3.connect(options.sqlfile) + conn.text_factory=str + db=conn.cursor() + db.execute("INSERT INTO list(SRC,TGT,exitcode)\ + VALUES(?,?,?)",(options.SRC,options.TGT,1)) + conn.commit() + return + +def clear(options,everything=False): + conn=sqlite3.connect(options.sqlfile) + conn.text_factory=str + db=conn.cursor() + db.execute("DELETE FROM list WHERE exitcode == 0") + if everything: + db.execute("DELETE FROM list WHERE SRC LIKE '%'") + conn.commit() + return + +def get_list(options): + conn=sqlite3.connect(options.sqlfile) + conn.text_factory=str + db=conn.cursor() + if options.rerun: + db.execute("SELECT SRC,TGT FROM list ORDER BY id") + else: + db.execute("SELECT SRC,TGT FROM list WHERE exitcode > 0 ORDER BY id") + nextEnt=db.fetchall() + return nextEnt + +def list_URLs(options): + conn=sqlite3.connect(options.sqlfile) + conn.text_factory=str + db=conn.cursor() + if options.listAllDB: + db.execute("SELECT * FROM list ORDER BY id") + else: + db.execute("SELECT * FROM list WHERE exitcode > 0 ORDER BY id") + print "EC\tSRC\tTGT" + for row in db: + print "%s\t%s\t%s" % (row[3],row[1],row[2]) + return + +def start_sync(options): + sync_list=get_list(options) + for sync in sync_list: + (SRC,TGT)=sync + if not SRC: + return + print("Starting: %s -> %s"%(SRC,TGT)) + syncopts=shlex.split(options.options) + command=['rsync'] + command.extend(syncopts) + command.extend([SRC,TGT]) + popen = subprocess.Popen(command, + stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=8) + lines_iterator = iter(popen.stdout.readline, b"") + try: + for line in lines_iterator: + sys.stdout.write(line) + except KeyboardInterrupt: + popen.kill() + sys.exit(1) + mark_done(options,SRC,TGT,popen.wait()) + if popen.returncode>0: + lines_iterator = iter(popen.stderr.readline, b"") + for line in lines_iterator: + sys.stdout.write(line) + print("FAILED: EC: %d, %s -> %s"%(popen.returncode,SRC,TGT)) + + else: + print("Finished: %s -> %s"%(SRC,TGT)) + +def main(): + options=setup_options(); + + if not os.path.exists(options.sqlfile): + createdb(options.sqlfile); + if options.SRC!='' and options.TGT!='': + print("Adding: %s -> %s"%(options.SRC,options.TGT)) + add(options) + if options.clear: + print("Clearing database") + clear(options,options.clearAll) + if options.listDB: + list_URLs(options) + if options.listAllDB: + list_URLs(options) + if options.run: + print("Start synchronization. ctrl-c to exit") + while True: + start_sync(options) + if not options.forever: + break + else: + time.sleep(5) + sys.exit(0) + +main()