diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000000000000000000000000000000000000..8dd399ab55bce8a4a1ddfd0c98170f771f6e5c35 --- /dev/null +++ b/.flake8 @@ -0,0 +1,3 @@ +[flake8] +max-line-length = 88 +extend-ignore = E203 diff --git a/generate.py b/generate.py index b06bdceaa858d944b486dc0112fde65d45edcdbf..064eb8277dc2e6d7d7420ed3050ad94eddf29630 100755 --- a/generate.py +++ b/generate.py @@ -7,59 +7,65 @@ import re import sys from pathlib import Path -config = configparser.ConfigParser( - interpolation=configparser.ExtendedInterpolation()) +config = configparser.ConfigParser(interpolation=configparser.ExtendedInterpolation()) try: - with open('./sshgen.cfg') as fp: + with open("./sshgen.cfg") as fp: config.read_file(fp) except Exception: pass -if 'presets' not in config: - config['presets'] = {} +if "presets" not in config: + config["presets"] = {} choices = [] choice_default = None else: - choices = list(config['presets']) - choice_default = list(config['presets'].keys())[0] + choices = list(config["presets"]) + choice_default = list(config["presets"].keys())[0] parser = argparse.ArgumentParser( - description='Generates a SSH config file from some DNS zone(s).') + description="Generates a SSH config file from some DNS zone(s)." +) if choices: - parser.add_argument('--preset', choices=choices, default=choice_default, - help='select a configuration preset') + parser.add_argument( + "--preset", + choices=choices, + default=choice_default, + help="select a configuration preset", + ) else: - parser.add_argument('--preset', action='store', default=choice_default, - help='select a configuration preset') -parser.add_argument('--cfg', action='store', default='./sshgen.cfg', - help='config file') + parser.add_argument( + "--preset", + action="store", + default=choice_default, + help="select a configuration preset", + ) +parser.add_argument("--cfg", action="store", default="./sshgen.cfg", help="config file") args = parser.parse_args() preset = args.preset -if args.cfg != './sshgen.cfg': +if args.cfg != "./sshgen.cfg": with open(args.cfg) as fp: config.read_file(fp) -if preset not in list(config['presets']): - sys.exit('preset not in presets configuration') +if preset not in list(config["presets"]): + sys.exit("preset not in presets configuration") def get_zones(): all_zones = [] - for x, y in config['zones'].items(): + for x, y in config["zones"].items(): p = Path(y) if p.is_dir(): all_zones.extend([z for z in p.iterdir()]) elif p.is_file(): all_zones.append(p) else: - print('incorrectly configured zone {}, skipping'.format(x), - file=sys.stderr) + print("incorrectly configured zone {}, skipping".format(x), file=sys.stderr) return all_zones def get_zone_file(zone): - with open(str(zone), 'r') as fp: - return '\n'.join(fp.readlines()) + with open(str(zone), "r") as fp: + return "\n".join(fp.readlines()) def retrieve_hosts(): @@ -72,9 +78,11 @@ def retrieve_hosts(): z = dns.zone.from_text(get_zone_file(k), relativize=False) # TODO AAAA records (and others) - for (name, ttl, rdata) in z.iterate_rdatas('A'): + for (name, ttl, rdata) in z.iterate_rdatas("A"): host = h.get(name) - if name in [s.strip() for s in config['excludes']['unmerged'].split(',')]: + if name in [ + s.strip() for s in config["excludes"]["unmerged"].split(",") + ]: continue if host is None: host = [] @@ -84,8 +92,10 @@ def retrieve_hosts(): addr = [] i[rdata.address] = addr addr.append(name) - for (name, ttl, rdata) in z.iterate_rdatas('CNAME'): - if name in [s.strip() for s in config['excludes']['unmerged'].split(',')]: + for (name, ttl, rdata) in z.iterate_rdatas("CNAME"): + if name in [ + s.strip() for s in config["excludes"]["unmerged"].split(",") + ]: continue target = h.get(rdata.target) if target is None: @@ -94,10 +104,12 @@ def retrieve_hosts(): target.append(name) except dns.zone.UnknownOrigin: for line in get_zone_file(k).splitlines(): - if line.startswith('#') or len(line.strip()) < 3: + if line.startswith("#") or len(line.strip()) < 3: continue parts = line.split() - if parts[1] in [s.strip() for s in config['excludes']['unmerged'].split(',')]: + if parts[1] in [ + s.strip() for s in config["excludes"]["unmerged"].split(",") + ]: continue addr = i.get(parts[0]) if addr is None: @@ -110,22 +122,24 @@ def retrieve_hosts(): h[parts[1]] = host if len(parts) > 2: for alt in parts[2:]: - if alt == '#': + if alt == "#": break host.append(alt) - req_set = set(s.strip() for s in config['excludes']['required'].split(',') - if s.strip()) + req_set = set( + s.strip() for s in config["excludes"]["required"].split(",") if s.strip() + ) def intersects(s): cmp_set = s.copy() for x in s: - parts = x.split('.') + parts = x.split(".") for n in range(len(parts)): start = -1 - n - cmp_set.add('.'.join(parts[start:])) + cmp_set.add(".".join(parts[start:])) return len(req_set.intersection(cmp_set)) > 0 - h = {k: v for k, v in h.items() if intersects(set([k]+v))} + + h = {k: v for k, v in h.items() if intersects(set([k] + v))} fin = False while not fin: @@ -161,29 +175,37 @@ def retrieve_hosts(): proxies = {} strip_domains = [] -preset_config = [k.strip() for k in config['presets'][preset].split(',')] +preset_config = [k.strip() for k in config["presets"][preset].split(",")] for c in preset_config: - if c.startswith('proxies_'): - proxies.update({re.compile(k.strip()): v - for v in config[c] for k in config[c][v].split(',')}) - elif c.startswith('strip_'): - strip_options = config['strips'][c[len('strip_'):]] - strip_domains.extend([re.compile(r'\.{}\.?'.format(k.strip())) - for k in strip_options.split(',')]) + if c.startswith("proxies_"): + proxies.update( + { + re.compile(k.strip()): v + for v in config[c] + for k in config[c][v].split(",") + } + ) + elif c.startswith("strip_"): + strip_options = config["strips"][c[len("strip_") :]] + strip_domains.extend( + [re.compile(r"\.{}\.?".format(k.strip())) for k in strip_options.split(",")] + ) else: pass -exclude_hosts = [re.compile(x.strip()) - for x in config['excludes']['hosts'].split(',')] -exclude_aliases = [re.compile(x.strip()) - for x in config['excludes']['aliases'].split(',')] -usernames = {re.compile(k.strip()): v - for v in config['usernames'] - for k in config['usernames'][v].split(',')} -agents = {re.compile(k.strip()): True - for k in config['agents']['enabled'].split(',')} -agents.update({re.compile(k.strip()): False - for k in config['agents']['disabled'].split(',')}) +exclude_hosts = [re.compile(x.strip()) for x in config["excludes"]["hosts"].split(",")] +exclude_aliases = [ + re.compile(x.strip()) for x in config["excludes"]["aliases"].split(",") +] +usernames = { + re.compile(k.strip()): v + for v in config["usernames"] + for k in config["usernames"][v].split(",") +} +agents = {re.compile(k.strip()): True for k in config["agents"]["enabled"].split(",")} +agents.update( + {re.compile(k.strip()): False for k in config["agents"]["disabled"].split(",")} +) h = {} h = retrieve_hosts() @@ -191,21 +213,21 @@ h = retrieve_hosts() def modify_list(h): for e in exclude_hosts: - h = {l: m for l, m in h.items() if not e.match(str(l))} + h = {item: m for item, m in h.items() if not e.match(str(item))} for k in h: - h[k] = [l for l in h[k] if not e.match(str(l))] + h[k] = [item for item in h[k] if not e.match(str(item))] for e in exclude_aliases: ni = {} for k in h: - h[k] = [l for l in h[k] if not e.match(str(l))] + h[k] = [item for item in h[k] if not e.match(str(item))] if e.match(str(k)): ni[h[k][0]] = h[k][1:] h.update(ni) - h = {l: m for l, m in h.items() if not e.match(str(l))} + h = {item: m for item, m in h.items() if not e.match(str(item))} for k in h: - for ak, av in config['aliases'].items(): + for ak, av in config["aliases"].items(): if str(k) == ak or str(k)[:-1] == ak: - h[k].extend([x.strip() for x in av.split(',')]) + h[k].extend([x.strip() for x in av.split(",")]) return h @@ -215,33 +237,37 @@ h = modify_list(h) def re_suffix(pattern, text): res = pattern.search(text) if res and res.span()[1] == len(text) and res.span()[0] != 0: - return text[res.span()[0]:res.span()[1]] + return text[res.span()[0] : res.span()[1]] return None for k in h: c = [str(k)] - c.extend([str(k)[:-len(re_suffix(d, str(k)))] for d in strip_domains - if re_suffix(d, str(k))]) + c.extend( + [ + str(k)[: -len(re_suffix(d, str(k)))] + for d in strip_domains + if re_suffix(d, str(k)) + ] + ) c.extend(map(str, h[k])) for j in map(str, h[k]): - c.extend([j[:-len(re_suffix(d, j))] for d in strip_domains - if re_suffix(d, j)]) - c = [x[:-1] if x.endswith('.') else x for x in c] + c.extend([j[: -len(re_suffix(d, j))] for d in strip_domains if re_suffix(d, j)]) + c = [x[:-1] if x.endswith(".") else x for x in c] - print('Host ' + ' '.join(c)) + print("Host " + " ".join(c)) hn = str(k) - print('\tHostName ' + (hn[:-1] if hn.endswith('.') else hn)) + print("\tHostName " + (hn[:-1] if hn.endswith(".") else hn)) for u in usernames: if u.match(str(k)): - print('\tUser ' + usernames[u]) + print("\tUser " + usernames[u]) break for a, v in sorted(agents.items(), key=lambda x: x[1]): if a.match(str(k)): - print('\tForwardAgent ' + ('yes' if v else 'no')) + print("\tForwardAgent " + ("yes" if v else "no")) break for p in proxies: if p.match(str(k)): - print('\tProxyJump ' + proxies[p]) + print("\tProxyJump " + proxies[p]) break - print('') + print("")