From 1dd4436aac8251cc4cb50d3403c62d0c6d28b4ee Mon Sep 17 00:00:00 2001 From: Lars Beckers <lars.beckers@rwth-aachen.de> Date: Sat, 15 Dec 2018 20:53:22 +0100 Subject: [PATCH] improve on pep8 conformance --- generate.py | 102 ++++++++++++++++++++++++++++++++-------------------- zonedl.py | 62 ++++++++++++++++++++------------ 2 files changed, 102 insertions(+), 62 deletions(-) diff --git a/generate.py b/generate.py index 3d786f5..84f62f5 100755 --- a/generate.py +++ b/generate.py @@ -7,14 +7,15 @@ import re import sys from pathlib import Path -config = configparser.ConfigParser(interpolation=configparser.ExtendedInterpolation()) +config = configparser.ConfigParser( + interpolation=configparser.ExtendedInterpolation()) try: with open('./sshgen.cfg') as fp: config.read_file(fp) except Exception: pass -if not 'presets' in config: +if 'presets' not in config: config['presets'] = {} choices = [] choice_default = None @@ -22,12 +23,16 @@ else: choices = list(config['presets']) choice_default = list(config['presets'].keys())[0] -parser = argparse.ArgumentParser(description='Generates a SSH config file from some DNS zone(s).') +parser = argparse.ArgumentParser( + description='Generates a SSH config file from some DNS zone(s).') if choices: - parser.add_argument('--preset', choices=choices, default=choice_default, help='select a configuration preset') + parser.add_argument('--preset', choices=choices, default=choice_default, + help='select a configuration preset') else: - parser.add_argument('--preset', action='store', default=choice_default, help='select a configuration preset') -parser.add_argument('--cfg', action='store', default='./sshgen.cfg', help='config file') + parser.add_argument('--preset', action='store', default=choice_default, + help='select a configuration preset') +parser.add_argument('--cfg', action='store', default='./sshgen.cfg', + help='config file') args = parser.parse_args() preset = args.preset @@ -37,22 +42,26 @@ if args.cfg != './sshgen.cfg': if preset not in list(config['presets']): sys.exit('preset not in presets configuration') + def get_zones(): all_zones = [] - for x,y in config['zones'].items(): + for x, y in config['zones'].items(): p = Path(y) if p.is_dir(): all_zones.extend([z for z in p.iterdir()]) elif p.is_file(): all_zones.append(p) else: - print('incorrectly configured zone {}, skipping'.format(x), file=sys.stderr) + print('incorrectly configured zone {}, skipping'.format(x), + file=sys.stderr) return all_zones + def get_zone_file(zone): with open(str(zone), 'r') as fp: return '\n'.join(fp.readlines()) + def retrieve_hosts(): d = get_zones() h = {} @@ -63,21 +72,21 @@ def retrieve_hosts(): # TODO AAAA records (and others) for (name, ttl, rdata) in z.iterate_rdatas('A'): - l = h.get(name) - if l is None: - l = [] - h[name] = l - m = i.get(rdata.address) - if m is None: - m = [] - i[rdata.address] = m - m.append(name) + host = h.get(name) + if host is None: + host = [] + h[name] = host + addr = i.get(rdata.address) + if addr is None: + addr = [] + i[rdata.address] = addr + addr.append(name) for (name, ttl, rdata) in z.iterate_rdatas('CNAME'): - l = h.get(rdata.target) - if l is None: - l = [] - h[rdata.target] = l - l.append(name) + target = h.get(rdata.target) + if target is None: + target = [] + h[rdata.target] = target + target.append(name) fin = False while not fin: @@ -96,13 +105,13 @@ def retrieve_hosts(): max_len = 0 max_host = None for j in i[key]: - if j in h: # TODO + if j in h: # TODO x = len(str(h[j])) if x > max_len: max_len = x max_host = j for j in i[key]: - if j == max_host or j not in h: # TODO + if j == max_host or j not in h: # TODO continue h[max_host].append(j) h[max_host].extend(h[j]) @@ -110,29 +119,40 @@ def retrieve_hosts(): return h + proxies = {} strip_domains = [] preset_config = [k.strip() for k in config['presets'][preset].split(',')] for c in preset_config: if c.startswith('proxies_'): - proxies.update({re.compile(k.strip()): v for v in config[c] for k in config[c][v].split(',')}) + proxies.update({re.compile(k.strip()): v + for v in config[c] for k in config[c][v].split(',')}) elif c.startswith('strip_'): - strip_domains.extend([re.compile('\.{}\.?'.format(k.strip())) for k in config['strips'][c[len('strip_'):]].split(',')]) + strip_options = config['strips'][c[len('strip_'):]] + strip_domains.extend([re.compile(r'\.{}\.?'.format(k.strip())) + for k in strip_options.split(',')]) else: pass -exclude_hosts = [re.compile(x.strip()) for x in config['excludes']['hosts'].split(',')] -exclude_aliases = [re.compile(x.strip()) for x in config['excludes']['aliases'].split(',')] -usernames = {re.compile(k.strip()): v for v in config['usernames'] for k in config['usernames'][v].split(',')} -agents = {re.compile(k.strip()): True for k in config['agents']['enabled'].split(',')} -agents.update({re.compile(k.strip()): False for k in config['agents']['disabled'].split(',')}) +exclude_hosts = [re.compile(x.strip()) + for x in config['excludes']['hosts'].split(',')] +exclude_aliases = [re.compile(x.strip()) + for x in config['excludes']['aliases'].split(',')] +usernames = {re.compile(k.strip()): v + for v in config['usernames'] + for k in config['usernames'][v].split(',')} +agents = {re.compile(k.strip()): True + for k in config['agents']['enabled'].split(',')} +agents.update({re.compile(k.strip()): False + for k in config['agents']['disabled'].split(',')}) h = {} h = retrieve_hosts() + def modify_list(h): for e in exclude_hosts: - h = {l:m for l,m in h.items() if not e.match(str(l))} + h = {l: m for l, m in h.items() if not e.match(str(l))} for k in h: h[k] = [l for l in h[k] if not e.match(str(l))] for e in exclude_aliases: @@ -142,27 +162,32 @@ def modify_list(h): if e.match(str(k)): ni[h[k][0]] = h[k][1:] h.update(ni) - h = {l:m for l,m in h.items() if not e.match(str(l))} + h = {l: m for l, m in h.items() if not e.match(str(l))} for k in h: - for ak,av in config['aliases'].items(): + for ak, av in config['aliases'].items(): if str(k) == ak or str(k)[:-1] == ak: h[k].extend([x.strip() for x in av.split(',')]) return h + h = modify_list(h) + def re_suffix(pattern, text): res = pattern.search(text) if res and res.span()[1] == len(text) and res.span()[0] != 0: return text[res.span()[0]:res.span()[1]] return None + for k in h: c = [str(k)] - c.extend([str(k)[:-len(re_suffix(d, str(k)))] for d in strip_domains if re_suffix(d, str(k))]) - c.extend(map(str,h[k])) - for j in map(str,h[k]): - c.extend([j[:-len(re_suffix(d, j))] for d in strip_domains if re_suffix(d, j)]) + c.extend([str(k)[:-len(re_suffix(d, str(k)))] for d in strip_domains + if re_suffix(d, str(k))]) + c.extend(map(str, h[k])) + for j in map(str, h[k]): + c.extend([j[:-len(re_suffix(d, j))] for d in strip_domains + if re_suffix(d, j)]) c = [x[:-1] if x.endswith('.') else x for x in c] print('Host ' + ' '.join(c)) @@ -181,4 +206,3 @@ for k in h: print('\tProxyJump ' + proxies[p]) break print('') - diff --git a/zonedl.py b/zonedl.py index d6e3716..98d5c18 100755 --- a/zonedl.py +++ b/zonedl.py @@ -2,40 +2,46 @@ import requests import bs4 -import re import argparse import getpass import subprocess import sys from pathlib import Path -parser = argparse.ArgumentParser(description='Downloads a zone file from RWTH DNS-Admin-Portal.') +parser = argparse.ArgumentParser( + description='Downloads a zone file from RWTH DNS-Admin-Portal.') group = parser.add_mutually_exclusive_group(required=True) -group.add_argument('--list', action='store_true', default=False, help='list available zones') +group.add_argument('--list', action='store_true', default=False, + help='list available zones') group.add_argument('--zone', type=int, nargs='+', help='download zone by id') group.add_argument('--domain', nargs='+', help='download zone by name') -parser.add_argument('--passwordstore', action='store', default=None, help='password store entry used for login to the portal') -parser.add_argument('dest', action='store', default='-', help='destination to store downloaded zone(s), - for stdout', nargs='?') +parser.add_argument('--passwordstore', action='store', default=None, + help='password store entry used for login to the portal') +parser.add_argument('dest', action='store', default='-', nargs='?', + help='destination for downloaded zone(s), - for stdout') args = parser.parse_args() -DNS_ADMIN = 'https://noc-portal.rz.rwth-aachen.de/dnsadmin' -ZONE_FILE = 'https://noc-portal.rz.rwth-aachen.de/dnsadmin/zones/{}-{}/pre_deploy_preview' +NOC_PREFIX = 'https://noc-portal.rz.rwth-aachen.de' +DNS_ADMIN = NOC_PREFIX + '/dnsadmin' +ZONE_FILE = DNS_ADMIN + '/zones/{}-{}/pre_deploy_preview' SHIB_PREFIX = 'https://sso.rwth-aachen.de' -SHIB_AUTH = 'https://sso.rwth-aachen.de/idp/profile/SAML2/Redirect/SSO' -SHIB_REDIRECT = 'https://noc-portal.rz.rwth-aachen.de/Shibboleth.sso/SAML2/POST' +SHIB_AUTH = SHIB_PREFIX + '/idp/profile/SAML2/Redirect/SSO' +SHIB_REDIRECT = NOC_PREFIX + '/Shibboleth.sso/SAML2/POST' if args.passwordstore: - prc = subprocess.run(['pass', 'show', args.passwordstore], stdout=subprocess.PIPE, check=True) + prc = subprocess.run(['pass', 'show', args.passwordstore], + stdout=subprocess.PIPE, check=True) USERNAME = prc.stdout.splitlines()[1].strip() PASSWORD = prc.stdout.splitlines()[0].strip() else: USERNAME = input('Username: ') PASSWORD = getpass.getpass() + def get_zones(session): r = session.get(DNS_ADMIN) if r.url.startswith(SHIB_AUTH): - res,session,r = shib_auth(session, r) + res, session, r = shib_auth(session, r) if not res: return {} b = bs4.BeautifulSoup(r.text, 'lxml') @@ -43,28 +49,35 @@ def get_zones(session): d = {} for zone in z.find_all('tr'): a = zone.find('td').find('a') - #d[int(a['href'].split('/')[-1])] = a.text d[a.text] = int(a['href'].split('/')[-1].split('-', maxsplit=1)[0]) return d + def get_zone_file(session, zone): - r = session.get(ZONE_FILE.format(str(zone[0]), str(zone[1].replace('.', '-')))) + r = session.get(ZONE_FILE.format(str(zone[0]), + str(zone[1].replace('.', '-')))) if r.url.startswith(SHIB_AUTH): - res,session,r = shib_auth(session, r) + res, session, r = shib_auth(session, r) if not res: return '' b = bs4.BeautifulSoup(r.text, 'lxml') t = b.find(id='content-wrapper').find('div', class_='zone-content').text - return t.replace('<br>', '').replace('\n\n', '\n') # \n + return t.replace('<br>', '').replace('\n\n', '\n') + def shib_auth(session, resp, iterations=0): b = bs4.BeautifulSoup(resp.text, 'lxml') form = b.find('form') data = {} if form['name'] == 'loginformular': - data={'j_username': USERNAME, 'j_password': PASSWORD, 'donotcache': 'true', '_shib_idp_revokeConsent': 'false', '_eventId_proceed': ''} + data = {'j_username': USERNAME, + 'j_password': PASSWORD, + 'donotcache': 'true', + '_shib_idp_revokeConsent': 'false', + '_eventId_proceed': '', + } elif form['name'] == 'form1': - data={'shib_idp_ls_exception.shib_idp_session_ss': '', + data = {'shib_idp_ls_exception.shib_idp_session_ss': '', 'shib_idp_ls_success.shib_idp_session_ss': 'false', 'shib_idp_ls_value.shib_idp_session_ss': '', 'shib_idp_ls_exception.shib_idp_persistent_ss': '', @@ -79,13 +92,15 @@ def shib_auth(session, resp, iterations=0): if not f.startswith(SHIB_AUTH[len(SHIB_PREFIX):]): relaystate = b.find('input', attrs={'name': 'RelayState'})['value'] samlresponse = b.find('input', attrs={'name': 'SAMLResponse'})['value'] - r = session.post(f, data={'RelayState': relaystate, 'SAMLResponse': samlresponse}) - return True,session,r + r = session.post(f, data={'RelayState': relaystate, + 'SAMLResponse': samlresponse}) + return True, session, r else: if iterations == 0: return shib_auth(session, r, 1) # authentication failed - return False,session,r + return False, session, r + s = requests.session() d = get_zones(s) @@ -111,7 +126,8 @@ if args.domain: wanted.append((d[k], k)) for a in args.domain: if a not in d: - print('Domain {} is not available, skipping'.format(a), file=sys.stderr) + print('Domain {} is not available, skipping'.format(a), + file=sys.stderr) else: zones = [] for k in d: @@ -120,7 +136,8 @@ else: zones.append(d[k]) for a in args.zone: if a not in zones: - print('Zone {} is not available, skipping'.format(a), file=sys.stderr) + print('Zone {} is not available, skipping'.format(a), + file=sys.stderr) if fp is None: for w in wanted: @@ -129,4 +146,3 @@ if fp is None: else: for w in wanted: fp.write(get_zone_file(s, w)) - -- GitLab