Skip to content
Snippets Groups Projects
Commit e7817983 authored by Lars Beckers's avatar Lars Beckers
Browse files

reformat generate.py with black

also adds a compatible flake8 config
parent 5519c486
No related branches found
No related tags found
No related merge requests found
Pipeline #2440 passed
[flake8]
max-line-length = 88
extend-ignore = E203
......@@ -7,59 +7,65 @@ import re
import sys
from pathlib import Path
config = configparser.ConfigParser(
interpolation=configparser.ExtendedInterpolation())
config = configparser.ConfigParser(interpolation=configparser.ExtendedInterpolation())
try:
with open('./sshgen.cfg') as fp:
with open("./sshgen.cfg") as fp:
config.read_file(fp)
except Exception:
pass
if 'presets' not in config:
config['presets'] = {}
if "presets" not in config:
config["presets"] = {}
choices = []
choice_default = None
else:
choices = list(config['presets'])
choice_default = list(config['presets'].keys())[0]
choices = list(config["presets"])
choice_default = list(config["presets"].keys())[0]
parser = argparse.ArgumentParser(
description='Generates a SSH config file from some DNS zone(s).')
description="Generates a SSH config file from some DNS zone(s)."
)
if choices:
parser.add_argument('--preset', choices=choices, default=choice_default,
help='select a configuration preset')
parser.add_argument(
"--preset",
choices=choices,
default=choice_default,
help="select a configuration preset",
)
else:
parser.add_argument('--preset', action='store', default=choice_default,
help='select a configuration preset')
parser.add_argument('--cfg', action='store', default='./sshgen.cfg',
help='config file')
parser.add_argument(
"--preset",
action="store",
default=choice_default,
help="select a configuration preset",
)
parser.add_argument("--cfg", action="store", default="./sshgen.cfg", help="config file")
args = parser.parse_args()
preset = args.preset
if args.cfg != './sshgen.cfg':
if args.cfg != "./sshgen.cfg":
with open(args.cfg) as fp:
config.read_file(fp)
if preset not in list(config['presets']):
sys.exit('preset not in presets configuration')
if preset not in list(config["presets"]):
sys.exit("preset not in presets configuration")
def get_zones():
all_zones = []
for x, y in config['zones'].items():
for x, y in config["zones"].items():
p = Path(y)
if p.is_dir():
all_zones.extend([z for z in p.iterdir()])
elif p.is_file():
all_zones.append(p)
else:
print('incorrectly configured zone {}, skipping'.format(x),
file=sys.stderr)
print("incorrectly configured zone {}, skipping".format(x), file=sys.stderr)
return all_zones
def get_zone_file(zone):
with open(str(zone), 'r') as fp:
return '\n'.join(fp.readlines())
with open(str(zone), "r") as fp:
return "\n".join(fp.readlines())
def retrieve_hosts():
......@@ -72,9 +78,11 @@ def retrieve_hosts():
z = dns.zone.from_text(get_zone_file(k), relativize=False)
# TODO AAAA records (and others)
for (name, ttl, rdata) in z.iterate_rdatas('A'):
for (name, ttl, rdata) in z.iterate_rdatas("A"):
host = h.get(name)
if name in [s.strip() for s in config['excludes']['unmerged'].split(',')]:
if name in [
s.strip() for s in config["excludes"]["unmerged"].split(",")
]:
continue
if host is None:
host = []
......@@ -84,8 +92,10 @@ def retrieve_hosts():
addr = []
i[rdata.address] = addr
addr.append(name)
for (name, ttl, rdata) in z.iterate_rdatas('CNAME'):
if name in [s.strip() for s in config['excludes']['unmerged'].split(',')]:
for (name, ttl, rdata) in z.iterate_rdatas("CNAME"):
if name in [
s.strip() for s in config["excludes"]["unmerged"].split(",")
]:
continue
target = h.get(rdata.target)
if target is None:
......@@ -94,10 +104,12 @@ def retrieve_hosts():
target.append(name)
except dns.zone.UnknownOrigin:
for line in get_zone_file(k).splitlines():
if line.startswith('#') or len(line.strip()) < 3:
if line.startswith("#") or len(line.strip()) < 3:
continue
parts = line.split()
if parts[1] in [s.strip() for s in config['excludes']['unmerged'].split(',')]:
if parts[1] in [
s.strip() for s in config["excludes"]["unmerged"].split(",")
]:
continue
addr = i.get(parts[0])
if addr is None:
......@@ -110,21 +122,23 @@ def retrieve_hosts():
h[parts[1]] = host
if len(parts) > 2:
for alt in parts[2:]:
if alt == '#':
if alt == "#":
break
host.append(alt)
req_set = set(s.strip() for s in config['excludes']['required'].split(',')
if s.strip())
req_set = set(
s.strip() for s in config["excludes"]["required"].split(",") if s.strip()
)
def intersects(s):
cmp_set = s.copy()
for x in s:
parts = x.split('.')
parts = x.split(".")
for n in range(len(parts)):
start = -1 - n
cmp_set.add('.'.join(parts[start:]))
cmp_set.add(".".join(parts[start:]))
return len(req_set.intersection(cmp_set)) > 0
h = {k: v for k, v in h.items() if intersects(set([k] + v))}
fin = False
......@@ -161,29 +175,37 @@ def retrieve_hosts():
proxies = {}
strip_domains = []
preset_config = [k.strip() for k in config['presets'][preset].split(',')]
preset_config = [k.strip() for k in config["presets"][preset].split(",")]
for c in preset_config:
if c.startswith('proxies_'):
proxies.update({re.compile(k.strip()): v
for v in config[c] for k in config[c][v].split(',')})
elif c.startswith('strip_'):
strip_options = config['strips'][c[len('strip_'):]]
strip_domains.extend([re.compile(r'\.{}\.?'.format(k.strip()))
for k in strip_options.split(',')])
if c.startswith("proxies_"):
proxies.update(
{
re.compile(k.strip()): v
for v in config[c]
for k in config[c][v].split(",")
}
)
elif c.startswith("strip_"):
strip_options = config["strips"][c[len("strip_") :]]
strip_domains.extend(
[re.compile(r"\.{}\.?".format(k.strip())) for k in strip_options.split(",")]
)
else:
pass
exclude_hosts = [re.compile(x.strip())
for x in config['excludes']['hosts'].split(',')]
exclude_aliases = [re.compile(x.strip())
for x in config['excludes']['aliases'].split(',')]
usernames = {re.compile(k.strip()): v
for v in config['usernames']
for k in config['usernames'][v].split(',')}
agents = {re.compile(k.strip()): True
for k in config['agents']['enabled'].split(',')}
agents.update({re.compile(k.strip()): False
for k in config['agents']['disabled'].split(',')})
exclude_hosts = [re.compile(x.strip()) for x in config["excludes"]["hosts"].split(",")]
exclude_aliases = [
re.compile(x.strip()) for x in config["excludes"]["aliases"].split(",")
]
usernames = {
re.compile(k.strip()): v
for v in config["usernames"]
for k in config["usernames"][v].split(",")
}
agents = {re.compile(k.strip()): True for k in config["agents"]["enabled"].split(",")}
agents.update(
{re.compile(k.strip()): False for k in config["agents"]["disabled"].split(",")}
)
h = {}
h = retrieve_hosts()
......@@ -191,21 +213,21 @@ h = retrieve_hosts()
def modify_list(h):
for e in exclude_hosts:
h = {l: m for l, m in h.items() if not e.match(str(l))}
h = {item: m for item, m in h.items() if not e.match(str(item))}
for k in h:
h[k] = [l for l in h[k] if not e.match(str(l))]
h[k] = [item for item in h[k] if not e.match(str(item))]
for e in exclude_aliases:
ni = {}
for k in h:
h[k] = [l for l in h[k] if not e.match(str(l))]
h[k] = [item for item in h[k] if not e.match(str(item))]
if e.match(str(k)):
ni[h[k][0]] = h[k][1:]
h.update(ni)
h = {l: m for l, m in h.items() if not e.match(str(l))}
h = {item: m for item, m in h.items() if not e.match(str(item))}
for k in h:
for ak, av in config['aliases'].items():
for ak, av in config["aliases"].items():
if str(k) == ak or str(k)[:-1] == ak:
h[k].extend([x.strip() for x in av.split(',')])
h[k].extend([x.strip() for x in av.split(",")])
return h
......@@ -221,27 +243,31 @@ def re_suffix(pattern, text):
for k in h:
c = [str(k)]
c.extend([str(k)[:-len(re_suffix(d, str(k)))] for d in strip_domains
if re_suffix(d, str(k))])
c.extend(
[
str(k)[: -len(re_suffix(d, str(k)))]
for d in strip_domains
if re_suffix(d, str(k))
]
)
c.extend(map(str, h[k]))
for j in map(str, h[k]):
c.extend([j[:-len(re_suffix(d, j))] for d in strip_domains
if re_suffix(d, j)])
c = [x[:-1] if x.endswith('.') else x for x in c]
c.extend([j[: -len(re_suffix(d, j))] for d in strip_domains if re_suffix(d, j)])
c = [x[:-1] if x.endswith(".") else x for x in c]
print('Host ' + ' '.join(c))
print("Host " + " ".join(c))
hn = str(k)
print('\tHostName ' + (hn[:-1] if hn.endswith('.') else hn))
print("\tHostName " + (hn[:-1] if hn.endswith(".") else hn))
for u in usernames:
if u.match(str(k)):
print('\tUser ' + usernames[u])
print("\tUser " + usernames[u])
break
for a, v in sorted(agents.items(), key=lambda x: x[1]):
if a.match(str(k)):
print('\tForwardAgent ' + ('yes' if v else 'no'))
print("\tForwardAgent " + ("yes" if v else "no"))
break
for p in proxies:
if p.match(str(k)):
print('\tProxyJump ' + proxies[p])
print("\tProxyJump " + proxies[p])
break
print('')
print("")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment