Commit fd40e94c authored by Robin Sonnabend's avatar Robin Sonnabend
Browse files

AD- and LDAP-Auth-Backends

Enable using multiple authentication backends.
(Samba-)AD and LDAP-Backends are implemented.
Session timeouts are implemented.

/close #17
parent 4cbada36
import ldap
import hmac, hashlib
import ldap3
from ldap3.utils.dn import parse_dn
from datetime import datetime
class User:
def __init__(self, username, groups):
def __init__(self, username, groups, timestamp=None):
self.username = username
self.groups = groups
if timestamp is not None:
self.timestamp = timestamp
else:
self.timestamp = datetime.now()
def summarize(self):
return "{}:{}".format(self.username, ",".join(self.groups))
return "{}:{}:{}".format(self.username, ",".join(self.groups), str(self.timestamp.timestamp()))
@staticmethod
def from_summary(summary):
name, groupstring = summary.split(":", 1)
parts = summary.split(":", 2)
if len(parts) != 3:
return None
name, groupstring, timestamp = parts
groups = groupstring.split(",")
return User(name, groups)
return User(name, groups, datetime.fromtimestamp(float(timestamp)))
@staticmethod
def from_hashstring(secure_string):
summary, hash = secure_string.split("=", 1)
return User.from_summary(summary)
class LdapManager:
class UserManager:
def __init__(self, backends):
self.backends = backends
def login(self, username, password):
for backend in self.backends:
if backend.authenticate(username, password):
groups = backend.groups(username, password)
return User(username, groups)
return None
def all_groups(self):
for backend in self.backends:
yield from backend.all_groups()
class LegacyLdapManager:
def __init__(self, url, base):
self.connection = ldap.initialize(url)
self.base = base
......@@ -39,16 +64,82 @@ class LdapManager:
return False
return False
def groups(self, username):
def groups(self, username, password=None):
result = []
# use username.lower() since memberUid is case sensitive here
for _, result_dict in self.connection.search_s(self.base, ldap.SCOPE_SUBTREE, "(memberUid={})".format(username.lower()), ["cn"]):
result.append(result_dict["cn"][0])
return result
class LdapManager:
def __init__(self, host, user_dn, group_dn, port=636, use_ssl=True):
self.server = ldap3.Server(host, port=port, use_ssl=use_ssl)
self.user_dn = user_dn
self.group_dn = group_dn
def authenticate(self, username, password):
connection = ldap3.Connection(self.server, self.user_dn.format(username), password)
return connection.bind()
def groups(self, username, password=None):
connection = ldap3.Connection(self.server)
obj_def = ldap3.ObjectDef("posixgroup", connection)
group_reader = ldap3.Reader(connection, obj_def, self.group_dn)
for group in group_reader.search():
members = group.memberUid.value
if members is not None and username in members:
yield group.cn.value
def all_groups(self):
connection = ldap3.Connection(self.server)
obj_def = ldap3.ObjectDef("posixgroup", connection)
group_reader = ldap3.Reader(connection, obj_def, self.group_dn)
for group in group_reader.search():
yield group.cn.value
class ADManager:
def __init__(self, host, domain, user_dn, group_dn, port=636, use_ssl=True):
self.server = ldap3.Server(host, port=port, use_ssl=use_ssl)
self.domain = domain
self.user_dn = user_dn
self.group_dn = group_dn
def prepare_connection(self, username=None, password=None):
if username is not None and password is not None:
ad_user = "{}\\{}".format(self.domain, username)
return ldap3.Connection(self.server, ad_user, password)
return ldap3.Connection(self.server)
def authenticate(self, username, password):
return self.prepare_connection(username, password).bind()
def groups(self, username, password):
connection = self.prepare_connection(username, password)
connection.bind()
obj_def = ldap3.ObjectDef("user", connection)
name_filter = "cn:={}".format(username)
user_reader = ldap3.Reader(connection, obj_def, self.user_dn, name_filter)
for result in user_reader.search():
for group_dn in result.memberOf:
group_dn_parts = parse_dn(group_dn)
if len(group_dn_parts) >= 1:
for group_dn in group_dn_parts:
key, group, next_char = group_dn
yield group
def all_groups(self):
connection = self.prepare_connection()
connection.bind()
obj_def = ldap3.ObjectDef("group", connection)
group_reader = ldap3.Reader(connection, obj_def, self.group_dn)
for result in reader.search():
yield result.name.value
class SecurityManager:
def __init__(self, key):
def __init__(self, key, max_duration=300):
self.maccer = hmac.new(key.encode("utf-8"), digestmod=hashlib.sha512)
self.max_duration = max_duration
def hash_user(self, user):
maccer = self.maccer.copy()
......@@ -64,4 +155,8 @@ class SecurityManager:
summary, hash = map(lambda s: s.encode("utf-8"), parts)
maccer = self.maccer.copy()
maccer.update(summary)
return hmac.compare_digest(maccer.hexdigest().encode("utf-8"), hash)
session_duration = datetime.now() - User.from_hashstring(string).timestamp
macs_equal = hmac.compare_digest(maccer.hexdigest().encode("utf-8"), hash)
time_short = int(session_duration.total_seconds()) < self.max_duration
return macs_equal and time_short
......@@ -66,7 +66,20 @@ CALENDAR_MAX_REQUESTS = 10
SESSION_PROTECTION = "strong" # do not change
# authentication
SECURITY_KEY = "some other random string" # change this
AUTH_MAX_DURATION = 300
AUTH_BACKENDS = [
LdapManager(
host="ldap.example.com",
user_dn="uid={},ou=users,dc=example,dc=com",
group_dn="dc=example,dc=com"),
ADManager(
host="ad.example.com",
domain="EXAMPLE",
user_dn="cn=users,dc=example,dc=com",
group_dn="dc=example,dc=com")
]
# lines of error description
ERROR_CONTEXT_LINES = 3
......
......@@ -20,7 +20,7 @@ import math
import mimetypes
import config
from shared import db, date_filter, datetime_filter, date_filter_long, date_filter_short, time_filter, time_filter_short, ldap_manager, security_manager, current_user, check_login, login_required, group_required, class_filter, needs_date_test, todostate_name_filter, code_filter, indent_tab_filter
from shared import db, date_filter, datetime_filter, date_filter_long, date_filter_short, time_filter, time_filter_short, user_manager, security_manager, current_user, check_login, login_required, group_required, class_filter, needs_date_test, todostate_name_filter, code_filter, indent_tab_filter
from utils import is_past, mail_manager, url_manager, get_first_unused_int, set_etherpad_text, get_etherpad_text, split_terms, optional_int_arg, fancy_join
from decorators import db_lookup, require_public_view_right, require_private_view_right, require_modify_right, require_admin_right
from models.database import ProtocolType, Protocol, DefaultTOP, TOP, LocalTOP, Document, Todo, Decision, MeetingReminder, Error, TodoMail, DecisionDocument, TodoState, Meta, DefaultMeta, DecisionCategory, Like
......@@ -1301,7 +1301,7 @@ def login():
return redirect(request.args.get("next") or url_for("index"))
form = LoginForm()
if form.validate_on_submit():
user = ldap_manager.login(form.username.data, form.password.data)
user = user_manager.login(form.username.data, form.password.data)
if user is not None:
session["auth"] = security_manager.hash_user(user)
flash("Login successful, {}!".format(user.username), "alert-success")
......
......@@ -91,11 +91,10 @@ def class_filter(obj):
def code_filter(text):
return "<code>{}</code>".format(text)
from auth import LdapManager, SecurityManager, User
ldap_manager = LdapManager(config.LDAP_PROVIDER_URL, config.LDAP_BASE)
security_manager = SecurityManager(config.SECURITY_KEY)
from auth import User
from auth import UserManager, SecurityManager, User
max_duration = getattr(config, "AUTH_MAX_DURATION")
user_manager = UserManager(backends=config.AUTH_BACKENDS)
security_manager = SecurityManager(config.SECURITY_KEY, max_duration)
def check_login():
return "auth" in session and security_manager.check_user(session["auth"])
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment