244 lines
7.4 KiB
Python
244 lines
7.4 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import shlex
|
|
import subprocess
|
|
import sys
|
|
from argparse import ArgumentError, ArgumentParser
|
|
|
|
import psutil
|
|
import yaml
|
|
|
|
__version__ = "2024.03.02"
|
|
|
|
CONFDIR = os.path.expanduser("~/.config/ssh-tunnelier")
|
|
CONF_OLD = os.path.join(CONFDIR, "tunnels.json")
|
|
CONF = os.path.join(CONFDIR, "tunnels.yaml")
|
|
# Just over a year in minutes
|
|
MAGIC_TIME = 525601
|
|
LOCALHOSTSYMBOL = "💻"
|
|
|
|
EXAMPLE_CONFIG = {
|
|
"test": {
|
|
"host": "host to connect, or defaults to name",
|
|
"options": "-4 and other ssh options",
|
|
"auto-connect": False,
|
|
"tunnels": [
|
|
{
|
|
"local_port": 1111,
|
|
"remote_port": 8080,
|
|
"remote_address": "localhost",
|
|
"reverse": False,
|
|
"comment": "`comment`, `reverse` and `remote_address` are not required",
|
|
}
|
|
],
|
|
}
|
|
}
|
|
|
|
|
|
def args():
|
|
"""Create command line options"""
|
|
|
|
parser = ArgumentParser()
|
|
parser.add_argument(
|
|
dest="name",
|
|
default=None,
|
|
action="store",
|
|
type=str,
|
|
nargs="?",
|
|
help="Connection name to use",
|
|
)
|
|
parser.add_argument(
|
|
"--kill",
|
|
dest="kill",
|
|
default=False,
|
|
action="store_true",
|
|
help="Kill connection",
|
|
)
|
|
parser.add_argument(
|
|
"--list",
|
|
dest="list",
|
|
default=False,
|
|
action="store_true",
|
|
help="List connections. Default if connection name is not given.",
|
|
)
|
|
parser.add_argument(
|
|
"--edit",
|
|
dest="edit",
|
|
default=None,
|
|
help="Edit config with user defined editor.",
|
|
)
|
|
parser.add_argument(
|
|
"--connect",
|
|
dest="connect",
|
|
default=None,
|
|
action="store",
|
|
type=str,
|
|
help="Instant tunnel. Syntax: sshserver:localport:targethost:remoteport",
|
|
)
|
|
parser.add_argument(
|
|
"--auto",
|
|
dest="auto",
|
|
default=False,
|
|
action="store_true",
|
|
help="Run all connections with auto-connect: true",
|
|
)
|
|
|
|
options = parser.parse_args()
|
|
if options.name is None and options.connect is None and not options.auto:
|
|
options.list = True
|
|
if options.kill:
|
|
if options.name is None:
|
|
parser.error("Connection name required with --kill")
|
|
return options
|
|
|
|
|
|
def check_type(d, key, expected, error_msg):
|
|
if not key in d:
|
|
raise ValueError(error_msg)
|
|
if not isinstance(d[key], expected):
|
|
raise ValueError(error_msg)
|
|
|
|
|
|
def load_config():
|
|
"""Load config, check types, add defaults"""
|
|
try:
|
|
os.makedirs(CONFDIR, exist_ok=True)
|
|
if not os.path.exists(CONF):
|
|
if os.path.exists(CONF_OLD):
|
|
with open(CONF_OLD, "r") as fp:
|
|
config = json.load(fp)
|
|
with open(CONF, "w") as fp:
|
|
yaml.dump(config, fp, sort_keys=False)
|
|
else:
|
|
print("Creating example config: " + CONF)
|
|
with open(CONF, "w") as fp:
|
|
yaml.dump(EXAMPLE_CONFIG, fp)
|
|
with open(CONF, "r") as fp:
|
|
config = yaml.safe_load(fp)
|
|
|
|
for name in config:
|
|
config[name]["host"] = config[name].get("host", name)
|
|
config[name]["options"] = config[name].get("options", "")
|
|
check_type(config[name], "host", str, f"Config['{name}']['host'] must be string")
|
|
check_type(config[name], "options", str, f"Config['{name}']['options'] must be string")
|
|
|
|
for i, tunnel in enumerate(config[name]["tunnels"]):
|
|
tunnel["remote_address"] = tunnel.get("remote_address", "localhost")
|
|
tunnel["reverse"] = tunnel.get("reverse", False)
|
|
tunnel["comment"] = tunnel.get("comment", "")
|
|
check_type(tunnel, "reverse", bool, f"Config['{name}']['tunnel'][{i}]['reverse'] must be Boolean")
|
|
check_type(
|
|
tunnel, "remote_address", str, f"Config['{name}']['tunnel'][{i}]['remote_address'] must be string"
|
|
)
|
|
check_type(tunnel, "remote_port", int, f"Config['{name}']['tunnel'][{i}]['remote_port'] must be int")
|
|
check_type(tunnel, "local_port", int, f"Config['{name}']['tunnel'][{i}]['local_port'] must be int")
|
|
check_type(tunnel, "comment", str, f"Config['{name}']['tunnel'][{i}]['comment'] must be string")
|
|
|
|
return config
|
|
|
|
except Exception as e:
|
|
print(e)
|
|
print(f"Could not load config. Edit {CONF} with --edit")
|
|
return {}
|
|
|
|
|
|
def config_edit(editor):
|
|
subprocess.run([editor, CONF])
|
|
|
|
|
|
def connect(name, config):
|
|
if name not in config:
|
|
raise ValueError("No such connection name")
|
|
|
|
host = config[name]["host"]
|
|
options = shlex.split(config[name]["options"])
|
|
tunnels = []
|
|
for tunnel in config[name]["tunnels"]:
|
|
switch = "-R" if tunnel["reverse"] else "-L"
|
|
tunnels.append(switch)
|
|
tunnels.append(f"{tunnel['local_port']}:{tunnel['remote_address']}:{tunnel['remote_port']}")
|
|
conn_id = get_id(name, config)
|
|
|
|
remote_cmd = f"nice /bin/bash -c 'for ((i=1;i<{MAGIC_TIME};i++)); do cut -f4 -d \" \" /proc/$PPID/stat | xargs kill -0 || exit ; sleep 60;done'; echo tunnelier {conn_id}"
|
|
cmd = ["ssh", "-f", "-n", *options, *tunnels, host, remote_cmd]
|
|
kill_connection(name, config)
|
|
subprocess.run(cmd)
|
|
list_connections(config, single=name)
|
|
|
|
|
|
def get_id(name, config):
|
|
return hashlib.md5((name + config[name]["host"]).encode("utf-8")).hexdigest()
|
|
|
|
|
|
def get_pid(conn_id):
|
|
for p in psutil.process_iter():
|
|
try:
|
|
if conn_id in " ".join(p.cmdline()):
|
|
return p.pid
|
|
except:
|
|
pass
|
|
return None
|
|
|
|
|
|
def kill_connection(name, config):
|
|
conn_id = get_id(name, config)
|
|
pid = get_pid(conn_id)
|
|
if pid:
|
|
print(f"Killing PID {pid}")
|
|
p = psutil.Process(pid)
|
|
p.terminate()
|
|
p.wait()
|
|
|
|
|
|
def list_connections(config, single=None):
|
|
for name in config:
|
|
if single:
|
|
if name != single:
|
|
continue
|
|
conn_id = get_id(name, config)
|
|
pid = get_pid(conn_id)
|
|
running = f"True, PID:{pid}" if pid else "False"
|
|
automatic = "Auto, " if config[name].get("auto-connect", False) else ""
|
|
print(f"# {name}")
|
|
print(f" - {automatic}Running: {running}")
|
|
for i, tunnel in enumerate(config[name]["tunnels"]):
|
|
url = f" http://localhost:{tunnel['local_port']}" if pid else ""
|
|
remote = LOCALHOSTSYMBOL if tunnel["remote_address"] == "localhost" else tunnel["remote_address"]
|
|
cmt = f" '{tunnel['comment']}'" if tunnel["comment"] else ""
|
|
print(f" - Tunnel {i}: {tunnel['local_port']} → {remote}:{tunnel['remote_port']}{cmt}{url} ")
|
|
|
|
|
|
def auto_connect(config):
|
|
for name in config:
|
|
if config[name].get("auto-connect", False):
|
|
connect(name, config)
|
|
|
|
|
|
def main():
|
|
opts = args()
|
|
config = load_config()
|
|
if opts.edit:
|
|
config_edit(opts.edit)
|
|
sys.exit(0)
|
|
if opts.list:
|
|
list_connections(config, opts.name)
|
|
sys.exit(0)
|
|
if opts.auto:
|
|
auto_connect(config)
|
|
if opts.connect:
|
|
host, tunnels = parse_connect(opts.connect)
|
|
connect_tunnel(host, tunnels)
|
|
sys.exit(0)
|
|
if opts.kill:
|
|
kill_connection(opts.name, config)
|
|
sys.exit(0)
|
|
if opts.name:
|
|
connect(opts.name, config)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|