auth.py 5.58 KB
Newer Older
1
2
import ldap
import hmac, hashlib
3
import ssl
Robin Sonnabend's avatar
Robin Sonnabend committed
4
5
6
import ldap3
from ldap3.utils.dn import parse_dn
from datetime import datetime
7
8

class User:
9
    def __init__(self, username, groups, timestamp=None, obsolete=False):
10
11
        self.username = username
        self.groups = groups
Robin Sonnabend's avatar
Robin Sonnabend committed
12
13
14
15
        if timestamp is not None:
            self.timestamp = timestamp
        else:
            self.timestamp = datetime.now()
16
        self.obsolete = obsolete
17
18

    def summarize(self):
19
        return "{}:{}:{}:{}".format(self.username, ",".join(self.groups), str(self.timestamp.timestamp()), self.obsolete)
20
21
22

    @staticmethod
    def from_summary(summary):
23
24
        parts = summary.split(":", 3)
        if len(parts) != 4:
Robin Sonnabend's avatar
Robin Sonnabend committed
25
            return None
26
27
28
29
30
        name, group_str, timestamp_str, obsolete_str = parts
        timestamp = datetime.fromtimestamp(float(timestamp_str))
        obsolete = obsolete_str == "True"
        groups = group_str.split(",")
        return User(name, groups, timestamp, obsolete)
31
32
33
34
35
36

    @staticmethod
    def from_hashstring(secure_string):
        summary, hash = secure_string.split("=", 1)
        return User.from_summary(summary)

Robin Sonnabend's avatar
Robin Sonnabend committed
37
38
39
40
41
42
43
44
class UserManager:
    def __init__(self, backends):
        self.backends = backends

    def login(self, username, password):
        for backend in self.backends:
            if backend.authenticate(username, password):
                groups = backend.groups(username, password)
45
                return User(username, groups, obsolete=backend.obsolete)
Robin Sonnabend's avatar
Robin Sonnabend committed
46
47
48
49
50
51
52
53
        return None

    def all_groups(self):
        for backend in self.backends:
            yield from backend.all_groups()


class LdapManager:
54
    def __init__(self, host, user_dn, group_dn, port=636, use_ssl=True, obsolete=False):
Robin Sonnabend's avatar
Robin Sonnabend committed
55
56
57
        self.server = ldap3.Server(host, port=port, use_ssl=use_ssl)
        self.user_dn = user_dn
        self.group_dn = group_dn
58
        self.obsolete = obsolete
Robin Sonnabend's avatar
Robin Sonnabend committed
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79

    def authenticate(self, username, password):
        connection = ldap3.Connection(self.server, self.user_dn.format(username), password)
        return connection.bind()

    def groups(self, username, password=None):
        connection = ldap3.Connection(self.server)
        obj_def = ldap3.ObjectDef("posixgroup", connection)
        group_reader = ldap3.Reader(connection, obj_def, self.group_dn)
        for group in group_reader.search():
            members = group.memberUid.value
            if members is not None and username in members:
                yield group.cn.value

    def all_groups(self):
        connection = ldap3.Connection(self.server)
        obj_def = ldap3.ObjectDef("posixgroup", connection)
        group_reader = ldap3.Reader(connection, obj_def, self.group_dn)
        for group in group_reader.search():
            yield group.cn.value

80

Robin Sonnabend's avatar
Robin Sonnabend committed
81
class ADManager:
82
    def __init__(self, host, domain, user_dn, group_dn,
83
        port=636, use_ssl=True, ca_cert=None, obsolete=False):
84
85
86
87
88
89
        tls_config = ldap3.Tls(validate=ssl.CERT_REQUIRED)
        if ca_cert is not None:
            tls_config = ldap3.Tls(validate=ssl.CERT_REQUIRED,
                ca_certs_file=ca_cert)
        self.server = ldap3.Server(host, port=port, use_ssl=use_ssl,
            tls=tls_config)
Robin Sonnabend's avatar
Robin Sonnabend committed
90
91
92
        self.domain = domain
        self.user_dn = user_dn
        self.group_dn = group_dn
93
        self.obsolete = obsolete
Robin Sonnabend's avatar
Robin Sonnabend committed
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113

    def prepare_connection(self, username=None, password=None):
        if username is not None and password is not None:
            ad_user = "{}\\{}".format(self.domain, username)
            return ldap3.Connection(self.server, ad_user, password)
        return ldap3.Connection(self.server)
        
    def authenticate(self, username, password):
        return self.prepare_connection(username, password).bind()

    def groups(self, username, password):
        connection = self.prepare_connection(username, password)
        connection.bind()
        obj_def = ldap3.ObjectDef("user", connection)
        name_filter = "cn:={}".format(username)
        user_reader = ldap3.Reader(connection, obj_def, self.user_dn, name_filter)
        for result in user_reader.search():
            for group_dn in result.memberOf:
                group_dn_parts = parse_dn(group_dn)
                if len(group_dn_parts) >= 1:
114
115
                    key, group, next_char = group_dn_parts[0]
                    yield group
Robin Sonnabend's avatar
Robin Sonnabend committed
116
117
118
119
120
121
122
123
124

    def all_groups(self):
        connection = self.prepare_connection()
        connection.bind()
        obj_def = ldap3.ObjectDef("group", connection)
        group_reader = ldap3.Reader(connection, obj_def, self.group_dn)
        for result in reader.search():
            yield result.name.value

125
class SecurityManager:
Robin Sonnabend's avatar
Robin Sonnabend committed
126
    def __init__(self, key, max_duration=300):
127
        self.maccer = hmac.new(key.encode("utf-8"), digestmod=hashlib.sha512)
Robin Sonnabend's avatar
Robin Sonnabend committed
128
        self.max_duration = max_duration
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143

    def hash_user(self, user):
        maccer = self.maccer.copy()
        summary = user.summarize()
        maccer.update(summary.encode("utf-8"))
        return "{}={}".format(summary, maccer.hexdigest())

    def check_user(self, string):
        parts = string.split("=", 1)
        if len(parts) != 2:
            # wrong format, expecting summary:hash
            return False
        summary, hash = map(lambda s: s.encode("utf-8"), parts)
        maccer = self.maccer.copy()
        maccer.update(summary)
144
145
146
147
        user = User.from_hashstring(string)
        if user is None:
            return False
        session_duration = datetime.now() - user.timestamp
Robin Sonnabend's avatar
Robin Sonnabend committed
148
149
150
151
        macs_equal = hmac.compare_digest(maccer.hexdigest().encode("utf-8"), hash)
        time_short = int(session_duration.total_seconds()) < self.max_duration 
        return macs_equal and time_short