diff --git a/auth.py b/auth.py index 42a5f4c70e28b90627b22520d811699efcf18032..e24f82015f518a37b7f392823373535e5a874ef2 100644 --- a/auth.py +++ b/auth.py @@ -6,25 +6,28 @@ from ldap3.utils.dn import parse_dn from datetime import datetime class User: - def __init__(self, username, groups, timestamp=None): + def __init__(self, username, groups, timestamp=None, obsolete=False): self.username = username self.groups = groups if timestamp is not None: self.timestamp = timestamp else: self.timestamp = datetime.now() + self.obsolete = obsolete def summarize(self): - return "{}:{}:{}".format(self.username, ",".join(self.groups), str(self.timestamp.timestamp())) + return "{}:{}:{}:{}".format(self.username, ",".join(self.groups), str(self.timestamp.timestamp()), self.obsolete) @staticmethod def from_summary(summary): - parts = summary.split(":", 2) - if len(parts) != 3: + parts = summary.split(":", 3) + if len(parts) != 4: return None - name, groupstring, timestamp = parts - groups = groupstring.split(",") - return User(name, groups, datetime.fromtimestamp(float(timestamp))) + name, group_str, timestamp_str, obsolete_str = parts + timestamp = datetime.fromtimestamp(float(timestamp_str)) + obsolete = obsolete_str == "True" + groups = group_str.split(",") + return User(name, groups, timestamp, obsolete) @staticmethod def from_hashstring(secure_string): @@ -39,45 +42,20 @@ class UserManager: for backend in self.backends: if backend.authenticate(username, password): groups = backend.groups(username, password) - return User(username, groups) + return User(username, groups, obsolete=backend.obsolete) return None def all_groups(self): for backend in self.backends: yield from backend.all_groups() -class LegacyLdapManager: - def __init__(self, url, base): - self.connection = ldap.initialize(url) - self.base = base - - def login(self, username, password): - if not self.authenticate(username, password): - return None - groups = list(map(lambda g: g.decode("utf-8"), self.groups(username))) - return User(username, groups) - - def authenticate(self, username, password): - try: - self.connection.simple_bind_s("uid={},ou=users,{}".format(username, self.base), password) - return True - except ldap.INVALID_CREDENTIALS: - return False - return False - - def groups(self, username, password=None): - result = [] - # use username.lower() since memberUid is case sensitive here - for _, result_dict in self.connection.search_s(self.base, ldap.SCOPE_SUBTREE, "(memberUid={})".format(username.lower()), ["cn"]): - result.append(result_dict["cn"][0]) - return result - class LdapManager: - def __init__(self, host, user_dn, group_dn, port=636, use_ssl=True): + def __init__(self, host, user_dn, group_dn, port=636, use_ssl=True, obsolete=False): self.server = ldap3.Server(host, port=port, use_ssl=use_ssl) self.user_dn = user_dn self.group_dn = group_dn + self.obsolete = obsolete def authenticate(self, username, password): connection = ldap3.Connection(self.server, self.user_dn.format(username), password) @@ -99,9 +77,10 @@ class LdapManager: for group in group_reader.search(): yield group.cn.value + class ADManager: def __init__(self, host, domain, user_dn, group_dn, - port=636, use_ssl=True, ca_cert=None): + port=636, use_ssl=True, ca_cert=None, obsolete=False): tls_config = ldap3.Tls(validate=ssl.CERT_REQUIRED) if ca_cert is not None: tls_config = ldap3.Tls(validate=ssl.CERT_REQUIRED, @@ -111,6 +90,7 @@ class ADManager: self.domain = domain self.user_dn = user_dn self.group_dn = group_dn + self.obsolete = obsolete def prepare_connection(self, username=None, password=None): if username is not None and password is not None: