Skip to content
Snippets Groups Projects
Commit c501e6cb authored by Robin Sonnabend's avatar Robin Sonnabend
Browse files

Enable marking authentication backends deprecated

And show a warning if a user logs in with one of them.
parent 354f5df2
No related branches found
No related tags found
No related merge requests found
...@@ -6,25 +6,28 @@ from ldap3.utils.dn import parse_dn ...@@ -6,25 +6,28 @@ from ldap3.utils.dn import parse_dn
from datetime import datetime from datetime import datetime
class User: class User:
def __init__(self, username, groups, timestamp=None): def __init__(self, username, groups, timestamp=None, obsolete=False):
self.username = username self.username = username
self.groups = groups self.groups = groups
if timestamp is not None: if timestamp is not None:
self.timestamp = timestamp self.timestamp = timestamp
else: else:
self.timestamp = datetime.now() self.timestamp = datetime.now()
self.obsolete = obsolete
def summarize(self): def summarize(self):
return "{}:{}:{}".format(self.username, ",".join(self.groups), str(self.timestamp.timestamp())) return "{}:{}:{}:{}".format(self.username, ",".join(self.groups), str(self.timestamp.timestamp()), self.obsolete)
@staticmethod @staticmethod
def from_summary(summary): def from_summary(summary):
parts = summary.split(":", 2) parts = summary.split(":", 3)
if len(parts) != 3: if len(parts) != 4:
return None return None
name, groupstring, timestamp = parts name, group_str, timestamp_str, obsolete_str = parts
groups = groupstring.split(",") timestamp = datetime.fromtimestamp(float(timestamp_str))
return User(name, groups, datetime.fromtimestamp(float(timestamp))) obsolete = obsolete_str == "True"
groups = group_str.split(",")
return User(name, groups, timestamp, obsolete)
@staticmethod @staticmethod
def from_hashstring(secure_string): def from_hashstring(secure_string):
...@@ -39,45 +42,20 @@ class UserManager: ...@@ -39,45 +42,20 @@ class UserManager:
for backend in self.backends: for backend in self.backends:
if backend.authenticate(username, password): if backend.authenticate(username, password):
groups = backend.groups(username, password) groups = backend.groups(username, password)
return User(username, groups) return User(username, groups, obsolete=backend.obsolete)
return None return None
def all_groups(self): def all_groups(self):
for backend in self.backends: for backend in self.backends:
yield from backend.all_groups() yield from backend.all_groups()
class LegacyLdapManager:
def __init__(self, url, base):
self.connection = ldap.initialize(url)
self.base = base
def login(self, username, password):
if not self.authenticate(username, password):
return None
groups = list(map(lambda g: g.decode("utf-8"), self.groups(username)))
return User(username, groups)
def authenticate(self, username, password):
try:
self.connection.simple_bind_s("uid={},ou=users,{}".format(username, self.base), password)
return True
except ldap.INVALID_CREDENTIALS:
return False
return False
def groups(self, username, password=None):
result = []
# use username.lower() since memberUid is case sensitive here
for _, result_dict in self.connection.search_s(self.base, ldap.SCOPE_SUBTREE, "(memberUid={})".format(username.lower()), ["cn"]):
result.append(result_dict["cn"][0])
return result
class LdapManager: class LdapManager:
def __init__(self, host, user_dn, group_dn, port=636, use_ssl=True): def __init__(self, host, user_dn, group_dn, port=636, use_ssl=True, obsolete=False):
self.server = ldap3.Server(host, port=port, use_ssl=use_ssl) self.server = ldap3.Server(host, port=port, use_ssl=use_ssl)
self.user_dn = user_dn self.user_dn = user_dn
self.group_dn = group_dn self.group_dn = group_dn
self.obsolete = obsolete
def authenticate(self, username, password): def authenticate(self, username, password):
connection = ldap3.Connection(self.server, self.user_dn.format(username), password) connection = ldap3.Connection(self.server, self.user_dn.format(username), password)
...@@ -99,9 +77,10 @@ class LdapManager: ...@@ -99,9 +77,10 @@ class LdapManager:
for group in group_reader.search(): for group in group_reader.search():
yield group.cn.value yield group.cn.value
class ADManager: class ADManager:
def __init__(self, host, domain, user_dn, group_dn, def __init__(self, host, domain, user_dn, group_dn,
port=636, use_ssl=True, ca_cert=None): port=636, use_ssl=True, ca_cert=None, obsolete=False):
tls_config = ldap3.Tls(validate=ssl.CERT_REQUIRED) tls_config = ldap3.Tls(validate=ssl.CERT_REQUIRED)
if ca_cert is not None: if ca_cert is not None:
tls_config = ldap3.Tls(validate=ssl.CERT_REQUIRED, tls_config = ldap3.Tls(validate=ssl.CERT_REQUIRED,
...@@ -111,6 +90,7 @@ class ADManager: ...@@ -111,6 +90,7 @@ class ADManager:
self.domain = domain self.domain = domain
self.user_dn = user_dn self.user_dn = user_dn
self.group_dn = group_dn self.group_dn = group_dn
self.obsolete = obsolete
def prepare_connection(self, username=None, password=None): def prepare_connection(self, username=None, password=None):
if username is not None and password is not None: if username is not None and password is not None:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment